PostgreSQL Source Code  git master
plpy_cursorobject.c
Go to the documentation of this file.
1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "utils/memutils.h"
15 
16 #include "plpython.h"
17 
18 #include "plpy_cursorobject.h"
19 
20 #include "plpy_elog.h"
21 #include "plpy_main.h"
22 #include "plpy_planobject.h"
23 #include "plpy_procedure.h"
24 #include "plpy_resultobject.h"
25 #include "plpy_spi.h"
26 
27 
28 static PyObject *PLy_cursor_query(const char *query);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33 
34 static char PLy_cursor_doc[] = {
35  "Wrapper around a PostgreSQL cursor"
36 };
37 
38 static PyMethodDef PLy_cursor_methods[] = {
39  {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40  {"close", PLy_cursor_close, METH_NOARGS, NULL},
41  {NULL, NULL, 0, NULL}
42 };
43 
44 static PyTypeObject PLy_CursorType = {
45  PyVarObject_HEAD_INIT(NULL, 0)
46  "PLyCursor", /* tp_name */
47  sizeof(PLyCursorObject), /* tp_size */
48  0, /* tp_itemsize */
49 
50  /*
51  * methods
52  */
53  PLy_cursor_dealloc, /* tp_dealloc */
54  0, /* tp_print */
55  0, /* tp_getattr */
56  0, /* tp_setattr */
57  0, /* tp_compare */
58  0, /* tp_repr */
59  0, /* tp_as_number */
60  0, /* tp_as_sequence */
61  0, /* tp_as_mapping */
62  0, /* tp_hash */
63  0, /* tp_call */
64  0, /* tp_str */
65  0, /* tp_getattro */
66  0, /* tp_setattro */
67  0, /* tp_as_buffer */
68  Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER, /* tp_flags */
69  PLy_cursor_doc, /* tp_doc */
70  0, /* tp_traverse */
71  0, /* tp_clear */
72  0, /* tp_richcompare */
73  0, /* tp_weaklistoffset */
74  PyObject_SelfIter, /* tp_iter */
75  PLy_cursor_iternext, /* tp_iternext */
76  PLy_cursor_methods, /* tp_tpmethods */
77 };
78 
79 void
81 {
82  if (PyType_Ready(&PLy_CursorType) < 0)
83  elog(ERROR, "could not initialize PLy_CursorType");
84 }
85 
86 PyObject *
87 PLy_cursor(PyObject *self, PyObject *args)
88 {
89  char *query;
90  PyObject *plan;
91  PyObject *planargs = NULL;
92 
93  if (PyArg_ParseTuple(args, "s", &query))
94  return PLy_cursor_query(query);
95 
96  PyErr_Clear();
97 
98  if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
99  return PLy_cursor_plan(plan, planargs);
100 
101  PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
102  return NULL;
103 }
104 
105 
106 static PyObject *
107 PLy_cursor_query(const char *query)
108 {
111  volatile MemoryContext oldcontext;
112  volatile ResourceOwner oldowner;
113 
114  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
115  return NULL;
116  cursor->portalname = NULL;
117  cursor->closed = false;
119  "PL/Python cursor context",
121 
122  /* Initialize for converting result tuples to Python */
123  PLy_input_setup_func(&cursor->result, cursor->mcxt,
124  RECORDOID, -1,
125  exec_ctx->curr_proc);
126 
127  oldcontext = CurrentMemoryContext;
128  oldowner = CurrentResourceOwner;
129 
130  PLy_spi_subtransaction_begin(oldcontext, oldowner);
131 
132  PG_TRY();
133  {
134  SPIPlanPtr plan;
135  Portal portal;
136 
137  pg_verifymbstr(query, strlen(query), false);
138 
139  plan = SPI_prepare(query, 0, NULL);
140  if (plan == NULL)
141  elog(ERROR, "SPI_prepare failed: %s",
143 
144  portal = SPI_cursor_open(NULL, plan, NULL, NULL,
145  exec_ctx->curr_proc->fn_readonly);
146  SPI_freeplan(plan);
147 
148  if (portal == NULL)
149  elog(ERROR, "SPI_cursor_open() failed: %s",
151 
152  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
153 
154  PLy_spi_subtransaction_commit(oldcontext, oldowner);
155  }
156  PG_CATCH();
157  {
158  PLy_spi_subtransaction_abort(oldcontext, oldowner);
159  return NULL;
160  }
161  PG_END_TRY();
162 
163  Assert(cursor->portalname != NULL);
164  return (PyObject *) cursor;
165 }
166 
167 PyObject *
168 PLy_cursor_plan(PyObject *ob, PyObject *args)
169 {
171  volatile int nargs;
172  int i;
173  PLyPlanObject *plan;
175  volatile MemoryContext oldcontext;
176  volatile ResourceOwner oldowner;
177 
178  if (args)
179  {
180  if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
181  {
182  PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
183  return NULL;
184  }
185  nargs = PySequence_Length(args);
186  }
187  else
188  nargs = 0;
189 
190  plan = (PLyPlanObject *) ob;
191 
192  if (nargs != plan->nargs)
193  {
194  char *sv;
195  PyObject *so = PyObject_Str(args);
196 
197  if (!so)
198  PLy_elog(ERROR, "could not execute plan");
199  sv = PyString_AsString(so);
200  PLy_exception_set_plural(PyExc_TypeError,
201  "Expected sequence of %d argument, got %d: %s",
202  "Expected sequence of %d arguments, got %d: %s",
203  plan->nargs,
204  plan->nargs, nargs, sv);
205  Py_DECREF(so);
206 
207  return NULL;
208  }
209 
210  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
211  return NULL;
212  cursor->portalname = NULL;
213  cursor->closed = false;
215  "PL/Python cursor context",
217 
218  /* Initialize for converting result tuples to Python */
219  PLy_input_setup_func(&cursor->result, cursor->mcxt,
220  RECORDOID, -1,
221  exec_ctx->curr_proc);
222 
223  oldcontext = CurrentMemoryContext;
224  oldowner = CurrentResourceOwner;
225 
226  PLy_spi_subtransaction_begin(oldcontext, oldowner);
227 
228  PG_TRY();
229  {
230  Portal portal;
231  char *volatile nulls;
232  volatile int j;
233 
234  if (nargs > 0)
235  nulls = palloc(nargs * sizeof(char));
236  else
237  nulls = NULL;
238 
239  for (j = 0; j < nargs; j++)
240  {
241  PLyObToDatum *arg = &plan->args[j];
242  PyObject *elem;
243 
244  elem = PySequence_GetItem(args, j);
245  PG_TRY();
246  {
247  bool isnull;
248 
249  plan->values[j] = PLy_output_convert(arg, elem, &isnull);
250  nulls[j] = isnull ? 'n' : ' ';
251  }
252  PG_CATCH();
253  {
254  Py_DECREF(elem);
255  PG_RE_THROW();
256  }
257  PG_END_TRY();
258  Py_DECREF(elem);
259  }
260 
261  portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
262  exec_ctx->curr_proc->fn_readonly);
263  if (portal == NULL)
264  elog(ERROR, "SPI_cursor_open() failed: %s",
266 
267  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
268 
269  PLy_spi_subtransaction_commit(oldcontext, oldowner);
270  }
271  PG_CATCH();
272  {
273  int k;
274 
275  /* cleanup plan->values array */
276  for (k = 0; k < nargs; k++)
277  {
278  if (!plan->args[k].typbyval &&
279  (plan->values[k] != PointerGetDatum(NULL)))
280  {
281  pfree(DatumGetPointer(plan->values[k]));
282  plan->values[k] = PointerGetDatum(NULL);
283  }
284  }
285 
286  Py_DECREF(cursor);
287 
288  PLy_spi_subtransaction_abort(oldcontext, oldowner);
289  return NULL;
290  }
291  PG_END_TRY();
292 
293  for (i = 0; i < nargs; i++)
294  {
295  if (!plan->args[i].typbyval &&
296  (plan->values[i] != PointerGetDatum(NULL)))
297  {
298  pfree(DatumGetPointer(plan->values[i]));
299  plan->values[i] = PointerGetDatum(NULL);
300  }
301  }
302 
303  Assert(cursor->portalname != NULL);
304  return (PyObject *) cursor;
305 }
306 
307 static void
309 {
311  Portal portal;
312 
313  cursor = (PLyCursorObject *) arg;
314 
315  if (!cursor->closed)
316  {
317  portal = GetPortalByName(cursor->portalname);
318 
319  if (PortalIsValid(portal))
320  SPI_cursor_close(portal);
321  cursor->closed = true;
322  }
323  if (cursor->mcxt)
324  {
325  MemoryContextDelete(cursor->mcxt);
326  cursor->mcxt = NULL;
327  }
328  arg->ob_type->tp_free(arg);
329 }
330 
331 static PyObject *
332 PLy_cursor_iternext(PyObject *self)
333 {
335  PyObject *ret;
337  volatile MemoryContext oldcontext;
338  volatile ResourceOwner oldowner;
339  Portal portal;
340 
341  cursor = (PLyCursorObject *) self;
342 
343  if (cursor->closed)
344  {
345  PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
346  return NULL;
347  }
348 
349  portal = GetPortalByName(cursor->portalname);
350  if (!PortalIsValid(portal))
351  {
352  PLy_exception_set(PyExc_ValueError,
353  "iterating a cursor in an aborted subtransaction");
354  return NULL;
355  }
356 
357  oldcontext = CurrentMemoryContext;
358  oldowner = CurrentResourceOwner;
359 
360  PLy_spi_subtransaction_begin(oldcontext, oldowner);
361 
362  PG_TRY();
363  {
364  SPI_cursor_fetch(portal, true, 1);
365  if (SPI_processed == 0)
366  {
367  PyErr_SetNone(PyExc_StopIteration);
368  ret = NULL;
369  }
370  else
371  {
373  exec_ctx->curr_proc);
374 
375  ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
377  }
378 
380 
381  PLy_spi_subtransaction_commit(oldcontext, oldowner);
382  }
383  PG_CATCH();
384  {
385  PLy_spi_subtransaction_abort(oldcontext, oldowner);
386  return NULL;
387  }
388  PG_END_TRY();
389 
390  return ret;
391 }
392 
393 static PyObject *
394 PLy_cursor_fetch(PyObject *self, PyObject *args)
395 {
397  int count;
398  PLyResultObject *ret;
400  volatile MemoryContext oldcontext;
401  volatile ResourceOwner oldowner;
402  Portal portal;
403 
404  if (!PyArg_ParseTuple(args, "i:fetch", &count))
405  return NULL;
406 
407  cursor = (PLyCursorObject *) self;
408 
409  if (cursor->closed)
410  {
411  PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
412  return NULL;
413  }
414 
415  portal = GetPortalByName(cursor->portalname);
416  if (!PortalIsValid(portal))
417  {
418  PLy_exception_set(PyExc_ValueError,
419  "iterating a cursor in an aborted subtransaction");
420  return NULL;
421  }
422 
423  ret = (PLyResultObject *) PLy_result_new();
424  if (ret == NULL)
425  return NULL;
426 
427  oldcontext = CurrentMemoryContext;
428  oldowner = CurrentResourceOwner;
429 
430  PLy_spi_subtransaction_begin(oldcontext, oldowner);
431 
432  PG_TRY();
433  {
434  SPI_cursor_fetch(portal, true, count);
435 
436  Py_DECREF(ret->status);
437  ret->status = PyInt_FromLong(SPI_OK_FETCH);
438 
439  Py_DECREF(ret->nrows);
440  ret->nrows = (SPI_processed > (uint64) LONG_MAX) ?
441  PyFloat_FromDouble((double) SPI_processed) :
442  PyInt_FromLong((long) SPI_processed);
443 
444  if (SPI_processed != 0)
445  {
446  uint64 i;
447 
448  /*
449  * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
450  * and list indices; so we cannot support a result larger than
451  * PY_SSIZE_T_MAX.
452  */
453  if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
454  ereport(ERROR,
455  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
456  errmsg("query result has too many rows to fit in a Python list")));
457 
458  Py_DECREF(ret->rows);
459  ret->rows = PyList_New(SPI_processed);
460  if (!ret->rows)
461  {
462  Py_DECREF(ret);
463  ret = NULL;
464  }
465  else
466  {
468  exec_ctx->curr_proc);
469 
470  for (i = 0; i < SPI_processed; i++)
471  {
472  PyObject *row = PLy_input_from_tuple(&cursor->result,
473  SPI_tuptable->vals[i],
475 
476  PyList_SetItem(ret->rows, i, row);
477  }
478  }
479  }
480 
482 
483  PLy_spi_subtransaction_commit(oldcontext, oldowner);
484  }
485  PG_CATCH();
486  {
487  PLy_spi_subtransaction_abort(oldcontext, oldowner);
488  return NULL;
489  }
490  PG_END_TRY();
491 
492  return (PyObject *) ret;
493 }
494 
495 static PyObject *
496 PLy_cursor_close(PyObject *self, PyObject *unused)
497 {
499 
500  if (!cursor->closed)
501  {
502  Portal portal = GetPortalByName(cursor->portalname);
503 
504  if (!PortalIsValid(portal))
505  {
506  PLy_exception_set(PyExc_ValueError,
507  "closing a cursor in an aborted subtransaction");
508  return NULL;
509  }
510 
511  SPI_cursor_close(portal);
512  cursor->closed = true;
513  }
514 
515  Py_RETURN_NONE;
516 }
static PyMethodDef PLy_cursor_methods[]
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
void PLy_exception_set_plural(PyObject *exc, const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: plpy_elog.c:514
Datum PLy_output_convert(PLyObToDatum *arg, PyObject *val, bool *isnull)
Definition: plpy_typeio.c:123
void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:506
PyObject * PLy_cursor(PyObject *self, PyObject *args)
PyObject_HEAD char * portalname
PyObject * PLy_input_from_tuple(PLyDatumToOb *arg, HeapTuple tuple, TupleDesc desc)
Definition: plpy_typeio.c:137
#define PyVarObject_HEAD_INIT(type, size)
Definition: plpython.h:111
#define PointerGetDatum(X)
Definition: postgres.h:562
Portal GetPortalByName(const char *name)
Definition: portalmem.c:129
ResourceOwner CurrentResourceOwner
Definition: resowner.c:138
SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes)
Definition: spi.c:488
SPITupleTable * SPI_tuptable
Definition: spi.c:41
int errcode(int sqlerrcode)
Definition: elog.c:575
PyObject * PLy_exc_error
Definition: plpy_elog.c:19
Portal SPI_cursor_open(const char *name, SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only)
Definition: spi.c:1037
PLyExecutionContext * PLy_current_execution_context(void)
Definition: plpy_main.c:409
void PLy_exception_set(PyObject *exc, const char *fmt,...)
Definition: plpy_elog.c:500
HeapTuple * vals
Definition: spi.h:28
PLyDatumToOb result
uint64 SPI_processed
Definition: spi.c:39
static PyObject * PLy_cursor_query(const char *query)
int SPI_result
Definition: spi.c:42
static PyObject * PLy_cursor_close(PyObject *self, PyObject *unused)
void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:489
void pfree(void *pointer)
Definition: mcxt.c:949
const char * name
Definition: portal.h:117
static PyObject * PLy_cursor_iternext(PyObject *self)
#define ERROR
Definition: elog.h:43
static void PLy_cursor_dealloc(PyObject *arg)
void PLy_cursor_init_type(void)
MemoryContext mcxt
const char * SPI_result_code_string(int code)
Definition: spi.c:1521
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:170
static PyObject * PLy_cursor_fetch(PyObject *self, PyObject *args)
void PLy_input_setup_tuple(PLyDatumToOb *arg, TupleDesc desc, PLyProcedure *proc)
Definition: plpy_typeio.c:168
PLyObToDatum * args
void PLy_input_setup_func(PLyDatumToOb *arg, MemoryContext arg_mcxt, Oid typeOid, int32 typmod, PLyProcedure *proc)
Definition: plpy_typeio.c:421
Definition: type.h:124
PyObject_HEAD SPIPlanPtr plan
#define RECORDOID
Definition: pg_type.h:680
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
#define ereport(elevel, rest)
Definition: elog.h:122
static char PLy_cursor_doc[]
MemoryContext TopMemoryContext
Definition: mcxt.c:43
PLyProcedure * curr_proc
Definition: plpy_main.h:20
void SPI_freetuptable(SPITupleTable *tuptable)
Definition: spi.c:978
#define PLy_elog
Definition: plpy_elog.h:36
#define PortalIsValid(p)
Definition: portal.h:199
#define SPI_OK_FETCH
Definition: spi.h:52
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:342
TupleDesc tupdesc
Definition: spi.h:27
#define PG_CATCH()
Definition: elog.h:293
#define Assert(condition)
Definition: c.h:670
struct PLyCursorObject PLyCursorObject
#define PG_RE_THROW()
Definition: elog.h:314
#define PY_SSIZE_T_MAX
Definition: plpython.h:66
#define DatumGetPointer(X)
Definition: postgres.h:555
int SPI_freeplan(SPIPlanPtr plan)
Definition: spi.c:615
void SPI_cursor_close(Portal portal)
Definition: spi.c:1411
PyObject * PLy_cursor_plan(PyObject *ob, PyObject *args)
PyObject * PLy_result_new(void)
void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:497
void * palloc(Size size)
Definition: mcxt.c:848
int errmsg(const char *fmt,...)
Definition: elog.c:797
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1063
int i
void * arg
void SPI_cursor_fetch(Portal portal, bool forward, long count)
Definition: spi.c:1355
PyObject_HEAD PyObject * nrows
bool pg_verifymbstr(const char *mbstr, int len, bool noError)
Definition: wchar.c:1866
#define elog
Definition: elog.h:219
#define PG_TRY()
Definition: elog.h:284
static PyTypeObject PLy_CursorType
#define PG_END_TRY()
Definition: elog.h:300