PostgreSQL Source Code  git master
plpy_cursorobject.c
Go to the documentation of this file.
1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "utils/memutils.h"
15 
16 #include "plpython.h"
17 
18 #include "plpy_cursorobject.h"
19 
20 #include "plpy_elog.h"
21 #include "plpy_main.h"
22 #include "plpy_planobject.h"
23 #include "plpy_procedure.h"
24 #include "plpy_resultobject.h"
25 #include "plpy_spi.h"
26 
27 
28 static PyObject *PLy_cursor_query(const char *query);
29 static void PLy_cursor_dealloc(PyObject *arg);
30 static PyObject *PLy_cursor_iternext(PyObject *self);
31 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
32 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
33 
34 static char PLy_cursor_doc[] = {
35  "Wrapper around a PostgreSQL cursor"
36 };
37 
38 static PyMethodDef PLy_cursor_methods[] = {
39  {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
40  {"close", PLy_cursor_close, METH_NOARGS, NULL},
41  {NULL, NULL, 0, NULL}
42 };
43 
44 static PyTypeObject PLy_CursorType = {
45  PyVarObject_HEAD_INIT(NULL, 0)
46  .tp_name = "PLyCursor",
47  .tp_basicsize = sizeof(PLyCursorObject),
48  .tp_dealloc = PLy_cursor_dealloc,
49  .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,
50  .tp_doc = PLy_cursor_doc,
51  .tp_iter = PyObject_SelfIter,
52  .tp_iternext = PLy_cursor_iternext,
53  .tp_methods = PLy_cursor_methods,
54 };
55 
56 void
58 {
59  if (PyType_Ready(&PLy_CursorType) < 0)
60  elog(ERROR, "could not initialize PLy_CursorType");
61 }
62 
63 PyObject *
64 PLy_cursor(PyObject *self, PyObject *args)
65 {
66  char *query;
67  PyObject *plan;
68  PyObject *planargs = NULL;
69 
70  if (PyArg_ParseTuple(args, "s", &query))
71  return PLy_cursor_query(query);
72 
73  PyErr_Clear();
74 
75  if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
76  return PLy_cursor_plan(plan, planargs);
77 
78  PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
79  return NULL;
80 }
81 
82 
83 static PyObject *
84 PLy_cursor_query(const char *query)
85 {
88  volatile MemoryContext oldcontext;
89  volatile ResourceOwner oldowner;
90 
91  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
92  return NULL;
93  cursor->portalname = NULL;
94  cursor->closed = false;
96  "PL/Python cursor context",
98 
99  /* Initialize for converting result tuples to Python */
100  PLy_input_setup_func(&cursor->result, cursor->mcxt,
101  RECORDOID, -1,
102  exec_ctx->curr_proc);
103 
104  oldcontext = CurrentMemoryContext;
105  oldowner = CurrentResourceOwner;
106 
107  PLy_spi_subtransaction_begin(oldcontext, oldowner);
108 
109  PG_TRY();
110  {
111  SPIPlanPtr plan;
112  Portal portal;
113 
114  pg_verifymbstr(query, strlen(query), false);
115 
116  plan = SPI_prepare(query, 0, NULL);
117  if (plan == NULL)
118  elog(ERROR, "SPI_prepare failed: %s",
120 
121  portal = SPI_cursor_open(NULL, plan, NULL, NULL,
122  exec_ctx->curr_proc->fn_readonly);
123  SPI_freeplan(plan);
124 
125  if (portal == NULL)
126  elog(ERROR, "SPI_cursor_open() failed: %s",
128 
129  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
130 
131  PinPortal(portal);
132 
133  PLy_spi_subtransaction_commit(oldcontext, oldowner);
134  }
135  PG_CATCH();
136  {
137  PLy_spi_subtransaction_abort(oldcontext, oldowner);
138  return NULL;
139  }
140  PG_END_TRY();
141 
142  Assert(cursor->portalname != NULL);
143  return (PyObject *) cursor;
144 }
145 
146 PyObject *
147 PLy_cursor_plan(PyObject *ob, PyObject *args)
148 {
150  volatile int nargs;
151  int i;
152  PLyPlanObject *plan;
154  volatile MemoryContext oldcontext;
155  volatile ResourceOwner oldowner;
156 
157  if (args)
158  {
159  if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
160  {
161  PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
162  return NULL;
163  }
164  nargs = PySequence_Length(args);
165  }
166  else
167  nargs = 0;
168 
169  plan = (PLyPlanObject *) ob;
170 
171  if (nargs != plan->nargs)
172  {
173  char *sv;
174  PyObject *so = PyObject_Str(args);
175 
176  if (!so)
177  PLy_elog(ERROR, "could not execute plan");
178  sv = PyString_AsString(so);
179  PLy_exception_set_plural(PyExc_TypeError,
180  "Expected sequence of %d argument, got %d: %s",
181  "Expected sequence of %d arguments, got %d: %s",
182  plan->nargs,
183  plan->nargs, nargs, sv);
184  Py_DECREF(so);
185 
186  return NULL;
187  }
188 
189  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
190  return NULL;
191  cursor->portalname = NULL;
192  cursor->closed = false;
194  "PL/Python cursor context",
196 
197  /* Initialize for converting result tuples to Python */
198  PLy_input_setup_func(&cursor->result, cursor->mcxt,
199  RECORDOID, -1,
200  exec_ctx->curr_proc);
201 
202  oldcontext = CurrentMemoryContext;
203  oldowner = CurrentResourceOwner;
204 
205  PLy_spi_subtransaction_begin(oldcontext, oldowner);
206 
207  PG_TRY();
208  {
209  Portal portal;
210  char *volatile nulls;
211  volatile int j;
212 
213  if (nargs > 0)
214  nulls = palloc(nargs * sizeof(char));
215  else
216  nulls = NULL;
217 
218  for (j = 0; j < nargs; j++)
219  {
220  PLyObToDatum *arg = &plan->args[j];
221  PyObject *elem;
222 
223  elem = PySequence_GetItem(args, j);
224  PG_TRY();
225  {
226  bool isnull;
227 
228  plan->values[j] = PLy_output_convert(arg, elem, &isnull);
229  nulls[j] = isnull ? 'n' : ' ';
230  }
231  PG_CATCH();
232  {
233  Py_DECREF(elem);
234  PG_RE_THROW();
235  }
236  PG_END_TRY();
237  Py_DECREF(elem);
238  }
239 
240  portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
241  exec_ctx->curr_proc->fn_readonly);
242  if (portal == NULL)
243  elog(ERROR, "SPI_cursor_open() failed: %s",
245 
246  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
247 
248  PinPortal(portal);
249 
250  PLy_spi_subtransaction_commit(oldcontext, oldowner);
251  }
252  PG_CATCH();
253  {
254  int k;
255 
256  /* cleanup plan->values array */
257  for (k = 0; k < nargs; k++)
258  {
259  if (!plan->args[k].typbyval &&
260  (plan->values[k] != PointerGetDatum(NULL)))
261  {
262  pfree(DatumGetPointer(plan->values[k]));
263  plan->values[k] = PointerGetDatum(NULL);
264  }
265  }
266 
267  Py_DECREF(cursor);
268 
269  PLy_spi_subtransaction_abort(oldcontext, oldowner);
270  return NULL;
271  }
272  PG_END_TRY();
273 
274  for (i = 0; i < nargs; i++)
275  {
276  if (!plan->args[i].typbyval &&
277  (plan->values[i] != PointerGetDatum(NULL)))
278  {
279  pfree(DatumGetPointer(plan->values[i]));
280  plan->values[i] = PointerGetDatum(NULL);
281  }
282  }
283 
284  Assert(cursor->portalname != NULL);
285  return (PyObject *) cursor;
286 }
287 
288 static void
290 {
292  Portal portal;
293 
294  cursor = (PLyCursorObject *) arg;
295 
296  if (!cursor->closed)
297  {
298  portal = GetPortalByName(cursor->portalname);
299 
300  if (PortalIsValid(portal))
301  {
302  UnpinPortal(portal);
303  SPI_cursor_close(portal);
304  }
305  cursor->closed = true;
306  }
307  if (cursor->mcxt)
308  {
309  MemoryContextDelete(cursor->mcxt);
310  cursor->mcxt = NULL;
311  }
312  arg->ob_type->tp_free(arg);
313 }
314 
315 static PyObject *
316 PLy_cursor_iternext(PyObject *self)
317 {
319  PyObject *ret;
321  volatile MemoryContext oldcontext;
322  volatile ResourceOwner oldowner;
323  Portal portal;
324 
325  cursor = (PLyCursorObject *) self;
326 
327  if (cursor->closed)
328  {
329  PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
330  return NULL;
331  }
332 
333  portal = GetPortalByName(cursor->portalname);
334  if (!PortalIsValid(portal))
335  {
336  PLy_exception_set(PyExc_ValueError,
337  "iterating a cursor in an aborted subtransaction");
338  return NULL;
339  }
340 
341  oldcontext = CurrentMemoryContext;
342  oldowner = CurrentResourceOwner;
343 
344  PLy_spi_subtransaction_begin(oldcontext, oldowner);
345 
346  PG_TRY();
347  {
348  SPI_cursor_fetch(portal, true, 1);
349  if (SPI_processed == 0)
350  {
351  PyErr_SetNone(PyExc_StopIteration);
352  ret = NULL;
353  }
354  else
355  {
357  exec_ctx->curr_proc);
358 
359  ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
360  SPI_tuptable->tupdesc, true);
361  }
362 
364 
365  PLy_spi_subtransaction_commit(oldcontext, oldowner);
366  }
367  PG_CATCH();
368  {
369  PLy_spi_subtransaction_abort(oldcontext, oldowner);
370  return NULL;
371  }
372  PG_END_TRY();
373 
374  return ret;
375 }
376 
377 static PyObject *
378 PLy_cursor_fetch(PyObject *self, PyObject *args)
379 {
381  int count;
382  PLyResultObject *ret;
384  volatile MemoryContext oldcontext;
385  volatile ResourceOwner oldowner;
386  Portal portal;
387 
388  if (!PyArg_ParseTuple(args, "i:fetch", &count))
389  return NULL;
390 
391  cursor = (PLyCursorObject *) self;
392 
393  if (cursor->closed)
394  {
395  PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
396  return NULL;
397  }
398 
399  portal = GetPortalByName(cursor->portalname);
400  if (!PortalIsValid(portal))
401  {
402  PLy_exception_set(PyExc_ValueError,
403  "iterating a cursor in an aborted subtransaction");
404  return NULL;
405  }
406 
407  ret = (PLyResultObject *) PLy_result_new();
408  if (ret == NULL)
409  return NULL;
410 
411  oldcontext = CurrentMemoryContext;
412  oldowner = CurrentResourceOwner;
413 
414  PLy_spi_subtransaction_begin(oldcontext, oldowner);
415 
416  PG_TRY();
417  {
418  SPI_cursor_fetch(portal, true, count);
419 
420  Py_DECREF(ret->status);
421  ret->status = PyInt_FromLong(SPI_OK_FETCH);
422 
423  Py_DECREF(ret->nrows);
424  ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
425 
426  if (SPI_processed != 0)
427  {
428  uint64 i;
429 
430  /*
431  * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
432  * and list indices; so we cannot support a result larger than
433  * PY_SSIZE_T_MAX.
434  */
435  if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
436  ereport(ERROR,
437  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
438  errmsg("query result has too many rows to fit in a Python list")));
439 
440  Py_DECREF(ret->rows);
441  ret->rows = PyList_New(SPI_processed);
442  if (!ret->rows)
443  {
444  Py_DECREF(ret);
445  ret = NULL;
446  }
447  else
448  {
450  exec_ctx->curr_proc);
451 
452  for (i = 0; i < SPI_processed; i++)
453  {
454  PyObject *row = PLy_input_from_tuple(&cursor->result,
455  SPI_tuptable->vals[i],
457  true);
458 
459  PyList_SetItem(ret->rows, i, row);
460  }
461  }
462  }
463 
465 
466  PLy_spi_subtransaction_commit(oldcontext, oldowner);
467  }
468  PG_CATCH();
469  {
470  PLy_spi_subtransaction_abort(oldcontext, oldowner);
471  return NULL;
472  }
473  PG_END_TRY();
474 
475  return (PyObject *) ret;
476 }
477 
478 static PyObject *
479 PLy_cursor_close(PyObject *self, PyObject *unused)
480 {
482 
483  if (!cursor->closed)
484  {
485  Portal portal = GetPortalByName(cursor->portalname);
486 
487  if (!PortalIsValid(portal))
488  {
489  PLy_exception_set(PyExc_ValueError,
490  "closing a cursor in an aborted subtransaction");
491  return NULL;
492  }
493 
494  UnpinPortal(portal);
495  SPI_cursor_close(portal);
496  cursor->closed = true;
497  }
498 
499  Py_RETURN_NONE;
500 }
void UnpinPortal(Portal portal)
Definition: portalmem.c:377
static PyMethodDef PLy_cursor_methods[]
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:211
void PLy_exception_set_plural(PyObject *exc, const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: plpy_elog.c:516
#define AllocSetContextCreate
Definition: memutils.h:170
Datum PLy_output_convert(PLyObToDatum *arg, PyObject *val, bool *isnull)
Definition: plpy_typeio.c:123
void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:508
PyObject * PLy_cursor(PyObject *self, PyObject *args)
PyObject_HEAD char * portalname
#define PyVarObject_HEAD_INIT(type, size)
Definition: plpython.h:113
#define PointerGetDatum(X)
Definition: postgres.h:556
Portal GetPortalByName(const char *name)
Definition: portalmem.c:130
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes)
Definition: spi.c:674
SPITupleTable * SPI_tuptable
Definition: spi.c:46
int errcode(int sqlerrcode)
Definition: elog.c:570
PyObject * PLy_exc_error
Definition: plpy_elog.c:19
Portal SPI_cursor_open(const char *name, SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only)
Definition: spi.c:1221
PLyExecutionContext * PLy_current_execution_context(void)
Definition: plpy_main.c:414
void PLy_exception_set(PyObject *exc, const char *fmt,...)
Definition: plpy_elog.c:502
HeapTuple * vals
Definition: spi.h:26
PLyDatumToOb result
uint64 SPI_processed
Definition: spi.c:45
static PyObject * PLy_cursor_query(const char *query)
int SPI_result
Definition: spi.c:47
static PyObject * PLy_cursor_close(PyObject *self, PyObject *unused)
void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:491
void pfree(void *pointer)
Definition: mcxt.c:1056
const char * name
Definition: portal.h:117
static PyObject * PLy_cursor_iternext(PyObject *self)
#define ERROR
Definition: elog.h:43
static void PLy_cursor_dealloc(PyObject *arg)
void PLy_cursor_init_type(void)
MemoryContext mcxt
#define PLy_elog
const char * SPI_result_code_string(int code)
Definition: spi.c:1705
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:192
static PyObject * PLy_cursor_fetch(PyObject *self, PyObject *args)
void PLy_input_setup_tuple(PLyDatumToOb *arg, TupleDesc desc, PLyProcedure *proc)
Definition: plpy_typeio.c:168
void PinPortal(Portal portal)
Definition: portalmem.c:368
PLyObToDatum * args
void PLy_input_setup_func(PLyDatumToOb *arg, MemoryContext arg_mcxt, Oid typeOid, int32 typmod, PLyProcedure *proc)
Definition: plpy_typeio.c:421
Definition: type.h:130
PyObject_HEAD SPIPlanPtr plan
MemoryContext CurrentMemoryContext
Definition: mcxt.c:38
#define ereport(elevel, rest)
Definition: elog.h:141
static char PLy_cursor_doc[]
MemoryContext TopMemoryContext
Definition: mcxt.c:44
PLyProcedure * curr_proc
Definition: plpy_main.h:20
void SPI_freetuptable(SPITupleTable *tuptable)
Definition: spi.c:1162
#define PortalIsValid(p)
Definition: portal.h:201
#define SPI_OK_FETCH
Definition: spi.h:55
TupleDesc tupdesc
Definition: spi.h:25
#define PG_CATCH()
Definition: elog.h:310
#define Assert(condition)
Definition: c.h:732
struct PLyCursorObject PLyCursorObject
PyObject * PLy_input_from_tuple(PLyDatumToOb *arg, HeapTuple tuple, TupleDesc desc, bool include_generated)
Definition: plpy_typeio.c:137
#define PG_RE_THROW()
Definition: elog.h:331
#define PY_SSIZE_T_MAX
Definition: plpython.h:68
#define DatumGetPointer(X)
Definition: postgres.h:549
int SPI_freeplan(SPIPlanPtr plan)
Definition: spi.c:801
void SPI_cursor_close(Portal portal)
Definition: spi.c:1595
PyObject * PLy_cursor_plan(PyObject *ob, PyObject *args)
PyObject * PLy_result_new(void)
void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:499
void * palloc(Size size)
Definition: mcxt.c:949
int errmsg(const char *fmt,...)
Definition: elog.c:784
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1173
#define elog(elevel,...)
Definition: elog.h:226
int i
void * arg
void SPI_cursor_fetch(Portal portal, bool forward, long count)
Definition: spi.c:1539
PyObject_HEAD PyObject * nrows
bool pg_verifymbstr(const char *mbstr, int len, bool noError)
Definition: wchar.c:1914
#define PG_TRY()
Definition: elog.h:301
static PyTypeObject PLy_CursorType
#define PG_END_TRY()
Definition: elog.h:317