PostgreSQL Source Code  git master
plpy_cursorobject.c
Go to the documentation of this file.
1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "access/xact.h"
12 #include "catalog/pg_type.h"
13 #include "mb/pg_wchar.h"
14 #include "plpy_cursorobject.h"
15 #include "plpy_elog.h"
16 #include "plpy_main.h"
17 #include "plpy_planobject.h"
18 #include "plpy_procedure.h"
19 #include "plpy_resultobject.h"
20 #include "plpy_spi.h"
21 #include "plpython.h"
22 #include "utils/memutils.h"
23 
24 static PyObject *PLy_cursor_query(const char *query);
25 static void PLy_cursor_dealloc(PyObject *arg);
26 static PyObject *PLy_cursor_iternext(PyObject *self);
27 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
28 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
29 
30 static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
31 
32 static PyMethodDef PLy_cursor_methods[] = {
33  {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
34  {"close", PLy_cursor_close, METH_NOARGS, NULL},
35  {NULL, NULL, 0, NULL}
36 };
37 
38 static PyTypeObject PLy_CursorType = {
39  PyVarObject_HEAD_INIT(NULL, 0)
40  .tp_name = "PLyCursor",
41  .tp_basicsize = sizeof(PLyCursorObject),
42  .tp_dealloc = PLy_cursor_dealloc,
43  .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
44  .tp_doc = PLy_cursor_doc,
45  .tp_iter = PyObject_SelfIter,
46  .tp_iternext = PLy_cursor_iternext,
47  .tp_methods = PLy_cursor_methods,
48 };
49 
50 void
52 {
53  if (PyType_Ready(&PLy_CursorType) < 0)
54  elog(ERROR, "could not initialize PLy_CursorType");
55 }
56 
57 PyObject *
58 PLy_cursor(PyObject *self, PyObject *args)
59 {
60  char *query;
61  PyObject *plan;
62  PyObject *planargs = NULL;
63 
64  if (PyArg_ParseTuple(args, "s", &query))
65  return PLy_cursor_query(query);
66 
67  PyErr_Clear();
68 
69  if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
70  return PLy_cursor_plan(plan, planargs);
71 
72  PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
73  return NULL;
74 }
75 
76 
77 static PyObject *
78 PLy_cursor_query(const char *query)
79 {
82  volatile MemoryContext oldcontext;
83  volatile ResourceOwner oldowner;
84 
85  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
86  return NULL;
87  cursor->portalname = NULL;
88  cursor->closed = false;
90  "PL/Python cursor context",
92 
93  /* Initialize for converting result tuples to Python */
94  PLy_input_setup_func(&cursor->result, cursor->mcxt,
95  RECORDOID, -1,
96  exec_ctx->curr_proc);
97 
98  oldcontext = CurrentMemoryContext;
99  oldowner = CurrentResourceOwner;
100 
101  PLy_spi_subtransaction_begin(oldcontext, oldowner);
102 
103  PG_TRY();
104  {
106  Portal portal;
107 
108  pg_verifymbstr(query, strlen(query), false);
109 
110  plan = SPI_prepare(query, 0, NULL);
111  if (plan == NULL)
112  elog(ERROR, "SPI_prepare failed: %s",
114 
115  portal = SPI_cursor_open(NULL, plan, NULL, NULL,
116  exec_ctx->curr_proc->fn_readonly);
118 
119  if (portal == NULL)
120  elog(ERROR, "SPI_cursor_open() failed: %s",
122 
123  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
124 
125  PinPortal(portal);
126 
127  PLy_spi_subtransaction_commit(oldcontext, oldowner);
128  }
129  PG_CATCH();
130  {
131  PLy_spi_subtransaction_abort(oldcontext, oldowner);
132  return NULL;
133  }
134  PG_END_TRY();
135 
136  Assert(cursor->portalname != NULL);
137  return (PyObject *) cursor;
138 }
139 
140 PyObject *
141 PLy_cursor_plan(PyObject *ob, PyObject *args)
142 {
144  volatile int nargs;
145  int i;
148  volatile MemoryContext oldcontext;
149  volatile ResourceOwner oldowner;
150 
151  if (args)
152  {
153  if (!PySequence_Check(args) || PyUnicode_Check(args))
154  {
155  PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
156  return NULL;
157  }
158  nargs = PySequence_Length(args);
159  }
160  else
161  nargs = 0;
162 
163  plan = (PLyPlanObject *) ob;
164 
165  if (nargs != plan->nargs)
166  {
167  char *sv;
168  PyObject *so = PyObject_Str(args);
169 
170  if (!so)
171  PLy_elog(ERROR, "could not execute plan");
172  sv = PLyUnicode_AsString(so);
173  PLy_exception_set_plural(PyExc_TypeError,
174  "Expected sequence of %d argument, got %d: %s",
175  "Expected sequence of %d arguments, got %d: %s",
176  plan->nargs,
177  plan->nargs, nargs, sv);
178  Py_DECREF(so);
179 
180  return NULL;
181  }
182 
183  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
184  return NULL;
185  cursor->portalname = NULL;
186  cursor->closed = false;
188  "PL/Python cursor context",
190 
191  /* Initialize for converting result tuples to Python */
192  PLy_input_setup_func(&cursor->result, cursor->mcxt,
193  RECORDOID, -1,
194  exec_ctx->curr_proc);
195 
196  oldcontext = CurrentMemoryContext;
197  oldowner = CurrentResourceOwner;
198 
199  PLy_spi_subtransaction_begin(oldcontext, oldowner);
200 
201  PG_TRY();
202  {
203  Portal portal;
204  char *volatile nulls;
205  volatile int j;
206 
207  if (nargs > 0)
208  nulls = palloc(nargs * sizeof(char));
209  else
210  nulls = NULL;
211 
212  for (j = 0; j < nargs; j++)
213  {
214  PLyObToDatum *arg = &plan->args[j];
215  PyObject *elem;
216 
217  elem = PySequence_GetItem(args, j);
218  PG_TRY(2);
219  {
220  bool isnull;
221 
222  plan->values[j] = PLy_output_convert(arg, elem, &isnull);
223  nulls[j] = isnull ? 'n' : ' ';
224  }
225  PG_FINALLY(2);
226  {
227  Py_DECREF(elem);
228  }
229  PG_END_TRY(2);
230  }
231 
232  portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
233  exec_ctx->curr_proc->fn_readonly);
234  if (portal == NULL)
235  elog(ERROR, "SPI_cursor_open() failed: %s",
237 
238  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
239 
240  PinPortal(portal);
241 
242  PLy_spi_subtransaction_commit(oldcontext, oldowner);
243  }
244  PG_CATCH();
245  {
246  int k;
247 
248  /* cleanup plan->values array */
249  for (k = 0; k < nargs; k++)
250  {
251  if (!plan->args[k].typbyval &&
252  (plan->values[k] != PointerGetDatum(NULL)))
253  {
254  pfree(DatumGetPointer(plan->values[k]));
255  plan->values[k] = PointerGetDatum(NULL);
256  }
257  }
258 
259  Py_DECREF(cursor);
260 
261  PLy_spi_subtransaction_abort(oldcontext, oldowner);
262  return NULL;
263  }
264  PG_END_TRY();
265 
266  for (i = 0; i < nargs; i++)
267  {
268  if (!plan->args[i].typbyval &&
269  (plan->values[i] != PointerGetDatum(NULL)))
270  {
271  pfree(DatumGetPointer(plan->values[i]));
272  plan->values[i] = PointerGetDatum(NULL);
273  }
274  }
275 
276  Assert(cursor->portalname != NULL);
277  return (PyObject *) cursor;
278 }
279 
280 static void
282 {
284  Portal portal;
285 
286  cursor = (PLyCursorObject *) arg;
287 
288  if (!cursor->closed)
289  {
290  portal = GetPortalByName(cursor->portalname);
291 
292  if (PortalIsValid(portal))
293  {
294  UnpinPortal(portal);
295  SPI_cursor_close(portal);
296  }
297  cursor->closed = true;
298  }
299  if (cursor->mcxt)
300  {
302  cursor->mcxt = NULL;
303  }
304  arg->ob_type->tp_free(arg);
305 }
306 
307 static PyObject *
308 PLy_cursor_iternext(PyObject *self)
309 {
311  PyObject *ret;
313  volatile MemoryContext oldcontext;
314  volatile ResourceOwner oldowner;
315  Portal portal;
316 
317  cursor = (PLyCursorObject *) self;
318 
319  if (cursor->closed)
320  {
321  PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
322  return NULL;
323  }
324 
325  portal = GetPortalByName(cursor->portalname);
326  if (!PortalIsValid(portal))
327  {
328  PLy_exception_set(PyExc_ValueError,
329  "iterating a cursor in an aborted subtransaction");
330  return NULL;
331  }
332 
333  oldcontext = CurrentMemoryContext;
334  oldowner = CurrentResourceOwner;
335 
336  PLy_spi_subtransaction_begin(oldcontext, oldowner);
337 
338  PG_TRY();
339  {
340  SPI_cursor_fetch(portal, true, 1);
341  if (SPI_processed == 0)
342  {
343  PyErr_SetNone(PyExc_StopIteration);
344  ret = NULL;
345  }
346  else
347  {
349  exec_ctx->curr_proc);
350 
351  ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
352  SPI_tuptable->tupdesc, true);
353  }
354 
356 
357  PLy_spi_subtransaction_commit(oldcontext, oldowner);
358  }
359  PG_CATCH();
360  {
361  PLy_spi_subtransaction_abort(oldcontext, oldowner);
362  return NULL;
363  }
364  PG_END_TRY();
365 
366  return ret;
367 }
368 
369 static PyObject *
370 PLy_cursor_fetch(PyObject *self, PyObject *args)
371 {
373  int count;
374  PLyResultObject *ret;
376  volatile MemoryContext oldcontext;
377  volatile ResourceOwner oldowner;
378  Portal portal;
379 
380  if (!PyArg_ParseTuple(args, "i:fetch", &count))
381  return NULL;
382 
383  cursor = (PLyCursorObject *) self;
384 
385  if (cursor->closed)
386  {
387  PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
388  return NULL;
389  }
390 
391  portal = GetPortalByName(cursor->portalname);
392  if (!PortalIsValid(portal))
393  {
394  PLy_exception_set(PyExc_ValueError,
395  "iterating a cursor in an aborted subtransaction");
396  return NULL;
397  }
398 
399  ret = (PLyResultObject *) PLy_result_new();
400  if (ret == NULL)
401  return NULL;
402 
403  oldcontext = CurrentMemoryContext;
404  oldowner = CurrentResourceOwner;
405 
406  PLy_spi_subtransaction_begin(oldcontext, oldowner);
407 
408  PG_TRY();
409  {
410  SPI_cursor_fetch(portal, true, count);
411 
412  Py_DECREF(ret->status);
413  ret->status = PyLong_FromLong(SPI_OK_FETCH);
414 
415  Py_DECREF(ret->nrows);
416  ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
417 
418  if (SPI_processed != 0)
419  {
420  uint64 i;
421 
422  /*
423  * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
424  * and list indices; so we cannot support a result larger than
425  * PY_SSIZE_T_MAX.
426  */
427  if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
428  ereport(ERROR,
429  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
430  errmsg("query result has too many rows to fit in a Python list")));
431 
432  Py_DECREF(ret->rows);
433  ret->rows = PyList_New(SPI_processed);
434  if (!ret->rows)
435  {
436  Py_DECREF(ret);
437  ret = NULL;
438  }
439  else
440  {
442  exec_ctx->curr_proc);
443 
444  for (i = 0; i < SPI_processed; i++)
445  {
446  PyObject *row = PLy_input_from_tuple(&cursor->result,
447  SPI_tuptable->vals[i],
449  true);
450 
451  PyList_SetItem(ret->rows, i, row);
452  }
453  }
454  }
455 
457 
458  PLy_spi_subtransaction_commit(oldcontext, oldowner);
459  }
460  PG_CATCH();
461  {
462  PLy_spi_subtransaction_abort(oldcontext, oldowner);
463  return NULL;
464  }
465  PG_END_TRY();
466 
467  return (PyObject *) ret;
468 }
469 
470 static PyObject *
471 PLy_cursor_close(PyObject *self, PyObject *unused)
472 {
474 
475  if (!cursor->closed)
476  {
477  Portal portal = GetPortalByName(cursor->portalname);
478 
479  if (!PortalIsValid(portal))
480  {
481  PLy_exception_set(PyExc_ValueError,
482  "closing a cursor in an aborted subtransaction");
483  return NULL;
484  }
485 
486  UnpinPortal(portal);
487  SPI_cursor_close(portal);
488  cursor->closed = true;
489  }
490 
491  Py_RETURN_NONE;
492 }
#define Assert(condition)
Definition: c.h:858
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define PG_FINALLY(...)
Definition: elog.h:388
#define ereport(elevel,...)
Definition: elog.h:149
int j
Definition: isn.c:74
int i
Definition: isn.c:73
#define PLy_elog
bool pg_verifymbstr(const char *mbstr, int len, bool noError)
Definition: mbutils.c:1556
void pfree(void *pointer)
Definition: mcxt.c:1521
MemoryContext TopMemoryContext
Definition: mcxt.c:149
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1317
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
void * arg
#define plan(x)
Definition: pg_regress.c:162
PyObject * PLy_cursor(PyObject *self, PyObject *args)
PyObject * PLy_cursor_plan(PyObject *ob, PyObject *args)
static PyMethodDef PLy_cursor_methods[]
static PyObject * PLy_cursor_query(const char *query)
static PyObject * PLy_cursor_fetch(PyObject *self, PyObject *args)
static PyObject * PLy_cursor_iternext(PyObject *self)
static PyTypeObject PLy_CursorType
static PyObject * PLy_cursor_close(PyObject *self, PyObject *unused)
static void PLy_cursor_dealloc(PyObject *arg)
void PLy_cursor_init_type(void)
static const char PLy_cursor_doc[]
struct PLyCursorObject PLyCursorObject
PyObject * PLy_exc_error
Definition: plpy_elog.c:15
void PLy_exception_set(PyObject *exc, const char *fmt,...)
Definition: plpy_elog.c:472
void PLy_exception_set_plural(PyObject *exc, const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: plpy_elog.c:486
PLyExecutionContext * PLy_current_execution_context(void)
Definition: plpy_main.c:365
PyObject * PLy_result_new(void)
void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:585
void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:594
void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:577
PyObject * PLy_input_from_tuple(PLyDatumToOb *arg, HeapTuple tuple, TupleDesc desc, bool include_generated)
Definition: plpy_typeio.c:134
void PLy_input_setup_func(PLyDatumToOb *arg, MemoryContext arg_mcxt, Oid typeOid, int32 typmod, PLyProcedure *proc)
Definition: plpy_typeio.c:418
void PLy_input_setup_tuple(PLyDatumToOb *arg, TupleDesc desc, PLyProcedure *proc)
Definition: plpy_typeio.c:165
Datum PLy_output_convert(PLyObToDatum *arg, PyObject *val, bool *isnull)
Definition: plpy_typeio.c:120
char * PLyUnicode_AsString(PyObject *unicode)
Definition: plpy_util.c:83
#define PortalIsValid(p)
Definition: portal.h:212
void PinPortal(Portal portal)
Definition: portalmem.c:371
void UnpinPortal(Portal portal)
Definition: portalmem.c:380
Portal GetPortalByName(const char *name)
Definition: portalmem.c:130
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
uint64 SPI_processed
Definition: spi.c:44
int SPI_freeplan(SPIPlanPtr plan)
Definition: spi.c:1022
SPITupleTable * SPI_tuptable
Definition: spi.c:45
int SPI_result
Definition: spi.c:46
const char * SPI_result_code_string(int code)
Definition: spi.c:1969
void SPI_cursor_fetch(Portal portal, bool forward, long count)
Definition: spi.c:1803
void SPI_freetuptable(SPITupleTable *tuptable)
Definition: spi.c:1383
Portal SPI_cursor_open(const char *name, SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only)
Definition: spi.c:1442
SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes)
Definition: spi.c:857
void SPI_cursor_close(Portal portal)
Definition: spi.c:1859
#define SPI_OK_FETCH
Definition: spi.h:84
PLyProcedure * curr_proc
Definition: plpy_main.h:20
PyObject_HEAD PyObject * nrows
const char * name
Definition: portal.h:118
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: type.h:137