PostgreSQL Source Code  git master
plpy_cursorobject.c
Go to the documentation of this file.
1 /*
2  * the PLyCursor class
3  *
4  * src/pl/plpython/plpy_cursorobject.c
5  */
6 
7 #include "postgres.h"
8 
9 #include <limits.h>
10 
11 #include "catalog/pg_type.h"
12 #include "mb/pg_wchar.h"
13 #include "plpy_cursorobject.h"
14 #include "plpy_elog.h"
15 #include "plpy_main.h"
16 #include "plpy_planobject.h"
17 #include "plpy_resultobject.h"
18 #include "plpy_spi.h"
19 #include "plpython.h"
20 #include "utils/memutils.h"
21 
22 static PyObject *PLy_cursor_query(const char *query);
23 static void PLy_cursor_dealloc(PyObject *arg);
24 static PyObject *PLy_cursor_iternext(PyObject *self);
25 static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
26 static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
27 
28 static const char PLy_cursor_doc[] = "Wrapper around a PostgreSQL cursor";
29 
30 static PyMethodDef PLy_cursor_methods[] = {
31  {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
32  {"close", PLy_cursor_close, METH_NOARGS, NULL},
33  {NULL, NULL, 0, NULL}
34 };
35 
36 static PyTypeObject PLy_CursorType = {
37  PyVarObject_HEAD_INIT(NULL, 0)
38  .tp_name = "PLyCursor",
39  .tp_basicsize = sizeof(PLyCursorObject),
40  .tp_dealloc = PLy_cursor_dealloc,
41  .tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
42  .tp_doc = PLy_cursor_doc,
43  .tp_iter = PyObject_SelfIter,
44  .tp_iternext = PLy_cursor_iternext,
45  .tp_methods = PLy_cursor_methods,
46 };
47 
48 void
50 {
51  if (PyType_Ready(&PLy_CursorType) < 0)
52  elog(ERROR, "could not initialize PLy_CursorType");
53 }
54 
55 PyObject *
56 PLy_cursor(PyObject *self, PyObject *args)
57 {
58  char *query;
59  PyObject *plan;
60  PyObject *planargs = NULL;
61 
62  if (PyArg_ParseTuple(args, "s", &query))
63  return PLy_cursor_query(query);
64 
65  PyErr_Clear();
66 
67  if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
68  return PLy_cursor_plan(plan, planargs);
69 
70  PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
71  return NULL;
72 }
73 
74 
75 static PyObject *
76 PLy_cursor_query(const char *query)
77 {
80  volatile MemoryContext oldcontext;
81  volatile ResourceOwner oldowner;
82 
83  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
84  return NULL;
85  cursor->portalname = NULL;
86  cursor->closed = false;
88  "PL/Python cursor context",
90 
91  /* Initialize for converting result tuples to Python */
92  PLy_input_setup_func(&cursor->result, cursor->mcxt,
93  RECORDOID, -1,
94  exec_ctx->curr_proc);
95 
96  oldcontext = CurrentMemoryContext;
97  oldowner = CurrentResourceOwner;
98 
99  PLy_spi_subtransaction_begin(oldcontext, oldowner);
100 
101  PG_TRY();
102  {
104  Portal portal;
105 
106  pg_verifymbstr(query, strlen(query), false);
107 
108  plan = SPI_prepare(query, 0, NULL);
109  if (plan == NULL)
110  elog(ERROR, "SPI_prepare failed: %s",
112 
113  portal = SPI_cursor_open(NULL, plan, NULL, NULL,
114  exec_ctx->curr_proc->fn_readonly);
116 
117  if (portal == NULL)
118  elog(ERROR, "SPI_cursor_open() failed: %s",
120 
121  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
122 
123  PinPortal(portal);
124 
125  PLy_spi_subtransaction_commit(oldcontext, oldowner);
126  }
127  PG_CATCH();
128  {
129  PLy_spi_subtransaction_abort(oldcontext, oldowner);
130  return NULL;
131  }
132  PG_END_TRY();
133 
134  Assert(cursor->portalname != NULL);
135  return (PyObject *) cursor;
136 }
137 
138 PyObject *
139 PLy_cursor_plan(PyObject *ob, PyObject *args)
140 {
142  volatile int nargs;
143  int i;
146  volatile MemoryContext oldcontext;
147  volatile ResourceOwner oldowner;
148 
149  if (args)
150  {
151  if (!PySequence_Check(args) || PyUnicode_Check(args))
152  {
153  PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
154  return NULL;
155  }
156  nargs = PySequence_Length(args);
157  }
158  else
159  nargs = 0;
160 
161  plan = (PLyPlanObject *) ob;
162 
163  if (nargs != plan->nargs)
164  {
165  char *sv;
166  PyObject *so = PyObject_Str(args);
167 
168  if (!so)
169  PLy_elog(ERROR, "could not execute plan");
170  sv = PLyUnicode_AsString(so);
171  PLy_exception_set_plural(PyExc_TypeError,
172  "Expected sequence of %d argument, got %d: %s",
173  "Expected sequence of %d arguments, got %d: %s",
174  plan->nargs,
175  plan->nargs, nargs, sv);
176  Py_DECREF(so);
177 
178  return NULL;
179  }
180 
181  if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
182  return NULL;
183  cursor->portalname = NULL;
184  cursor->closed = false;
186  "PL/Python cursor context",
188 
189  /* Initialize for converting result tuples to Python */
190  PLy_input_setup_func(&cursor->result, cursor->mcxt,
191  RECORDOID, -1,
192  exec_ctx->curr_proc);
193 
194  oldcontext = CurrentMemoryContext;
195  oldowner = CurrentResourceOwner;
196 
197  PLy_spi_subtransaction_begin(oldcontext, oldowner);
198 
199  PG_TRY();
200  {
201  Portal portal;
202  char *volatile nulls;
203  volatile int j;
204 
205  if (nargs > 0)
206  nulls = palloc(nargs * sizeof(char));
207  else
208  nulls = NULL;
209 
210  for (j = 0; j < nargs; j++)
211  {
212  PLyObToDatum *arg = &plan->args[j];
213  PyObject *elem;
214 
215  elem = PySequence_GetItem(args, j);
216  PG_TRY(2);
217  {
218  bool isnull;
219 
220  plan->values[j] = PLy_output_convert(arg, elem, &isnull);
221  nulls[j] = isnull ? 'n' : ' ';
222  }
223  PG_FINALLY(2);
224  {
225  Py_DECREF(elem);
226  }
227  PG_END_TRY(2);
228  }
229 
230  portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
231  exec_ctx->curr_proc->fn_readonly);
232  if (portal == NULL)
233  elog(ERROR, "SPI_cursor_open() failed: %s",
235 
236  cursor->portalname = MemoryContextStrdup(cursor->mcxt, portal->name);
237 
238  PinPortal(portal);
239 
240  PLy_spi_subtransaction_commit(oldcontext, oldowner);
241  }
242  PG_CATCH();
243  {
244  int k;
245 
246  /* cleanup plan->values array */
247  for (k = 0; k < nargs; k++)
248  {
249  if (!plan->args[k].typbyval &&
250  (plan->values[k] != PointerGetDatum(NULL)))
251  {
252  pfree(DatumGetPointer(plan->values[k]));
253  plan->values[k] = PointerGetDatum(NULL);
254  }
255  }
256 
257  Py_DECREF(cursor);
258 
259  PLy_spi_subtransaction_abort(oldcontext, oldowner);
260  return NULL;
261  }
262  PG_END_TRY();
263 
264  for (i = 0; i < nargs; i++)
265  {
266  if (!plan->args[i].typbyval &&
267  (plan->values[i] != PointerGetDatum(NULL)))
268  {
269  pfree(DatumGetPointer(plan->values[i]));
270  plan->values[i] = PointerGetDatum(NULL);
271  }
272  }
273 
274  Assert(cursor->portalname != NULL);
275  return (PyObject *) cursor;
276 }
277 
278 static void
280 {
282  Portal portal;
283 
284  cursor = (PLyCursorObject *) arg;
285 
286  if (!cursor->closed)
287  {
288  portal = GetPortalByName(cursor->portalname);
289 
290  if (PortalIsValid(portal))
291  {
292  UnpinPortal(portal);
293  SPI_cursor_close(portal);
294  }
295  cursor->closed = true;
296  }
297  if (cursor->mcxt)
298  {
300  cursor->mcxt = NULL;
301  }
302  arg->ob_type->tp_free(arg);
303 }
304 
305 static PyObject *
306 PLy_cursor_iternext(PyObject *self)
307 {
309  PyObject *ret;
311  volatile MemoryContext oldcontext;
312  volatile ResourceOwner oldowner;
313  Portal portal;
314 
315  cursor = (PLyCursorObject *) self;
316 
317  if (cursor->closed)
318  {
319  PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
320  return NULL;
321  }
322 
323  portal = GetPortalByName(cursor->portalname);
324  if (!PortalIsValid(portal))
325  {
326  PLy_exception_set(PyExc_ValueError,
327  "iterating a cursor in an aborted subtransaction");
328  return NULL;
329  }
330 
331  oldcontext = CurrentMemoryContext;
332  oldowner = CurrentResourceOwner;
333 
334  PLy_spi_subtransaction_begin(oldcontext, oldowner);
335 
336  PG_TRY();
337  {
338  SPI_cursor_fetch(portal, true, 1);
339  if (SPI_processed == 0)
340  {
341  PyErr_SetNone(PyExc_StopIteration);
342  ret = NULL;
343  }
344  else
345  {
347  exec_ctx->curr_proc);
348 
349  ret = PLy_input_from_tuple(&cursor->result, SPI_tuptable->vals[0],
350  SPI_tuptable->tupdesc, true);
351  }
352 
354 
355  PLy_spi_subtransaction_commit(oldcontext, oldowner);
356  }
357  PG_CATCH();
358  {
359  PLy_spi_subtransaction_abort(oldcontext, oldowner);
360  return NULL;
361  }
362  PG_END_TRY();
363 
364  return ret;
365 }
366 
367 static PyObject *
368 PLy_cursor_fetch(PyObject *self, PyObject *args)
369 {
371  int count;
372  PLyResultObject *ret;
374  volatile MemoryContext oldcontext;
375  volatile ResourceOwner oldowner;
376  Portal portal;
377 
378  if (!PyArg_ParseTuple(args, "i:fetch", &count))
379  return NULL;
380 
381  cursor = (PLyCursorObject *) self;
382 
383  if (cursor->closed)
384  {
385  PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
386  return NULL;
387  }
388 
389  portal = GetPortalByName(cursor->portalname);
390  if (!PortalIsValid(portal))
391  {
392  PLy_exception_set(PyExc_ValueError,
393  "iterating a cursor in an aborted subtransaction");
394  return NULL;
395  }
396 
397  ret = (PLyResultObject *) PLy_result_new();
398  if (ret == NULL)
399  return NULL;
400 
401  oldcontext = CurrentMemoryContext;
402  oldowner = CurrentResourceOwner;
403 
404  PLy_spi_subtransaction_begin(oldcontext, oldowner);
405 
406  PG_TRY();
407  {
408  SPI_cursor_fetch(portal, true, count);
409 
410  Py_DECREF(ret->status);
411  ret->status = PyLong_FromLong(SPI_OK_FETCH);
412 
413  Py_DECREF(ret->nrows);
414  ret->nrows = PyLong_FromUnsignedLongLong(SPI_processed);
415 
416  if (SPI_processed != 0)
417  {
418  uint64 i;
419 
420  /*
421  * PyList_New() and PyList_SetItem() use Py_ssize_t for list size
422  * and list indices; so we cannot support a result larger than
423  * PY_SSIZE_T_MAX.
424  */
425  if (SPI_processed > (uint64) PY_SSIZE_T_MAX)
426  ereport(ERROR,
427  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
428  errmsg("query result has too many rows to fit in a Python list")));
429 
430  Py_DECREF(ret->rows);
431  ret->rows = PyList_New(SPI_processed);
432  if (!ret->rows)
433  {
434  Py_DECREF(ret);
435  ret = NULL;
436  }
437  else
438  {
440  exec_ctx->curr_proc);
441 
442  for (i = 0; i < SPI_processed; i++)
443  {
444  PyObject *row = PLy_input_from_tuple(&cursor->result,
445  SPI_tuptable->vals[i],
447  true);
448 
449  PyList_SetItem(ret->rows, i, row);
450  }
451  }
452  }
453 
455 
456  PLy_spi_subtransaction_commit(oldcontext, oldowner);
457  }
458  PG_CATCH();
459  {
460  PLy_spi_subtransaction_abort(oldcontext, oldowner);
461  return NULL;
462  }
463  PG_END_TRY();
464 
465  return (PyObject *) ret;
466 }
467 
468 static PyObject *
469 PLy_cursor_close(PyObject *self, PyObject *unused)
470 {
472 
473  if (!cursor->closed)
474  {
475  Portal portal = GetPortalByName(cursor->portalname);
476 
477  if (!PortalIsValid(portal))
478  {
479  PLy_exception_set(PyExc_ValueError,
480  "closing a cursor in an aborted subtransaction");
481  return NULL;
482  }
483 
484  UnpinPortal(portal);
485  SPI_cursor_close(portal);
486  cursor->closed = true;
487  }
488 
489  Py_RETURN_NONE;
490 }
#define Assert(condition)
Definition: c.h:837
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define elog(elevel,...)
Definition: elog.h:225
#define PG_FINALLY(...)
Definition: elog.h:388
#define ereport(elevel,...)
Definition: elog.h:149
int j
Definition: isn.c:73
int i
Definition: isn.c:72
#define PLy_elog
bool pg_verifymbstr(const char *mbstr, int len, bool noError)
Definition: mbutils.c:1556
void pfree(void *pointer)
Definition: mcxt.c:1521
MemoryContext TopMemoryContext
Definition: mcxt.c:149
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
void * palloc(Size size)
Definition: mcxt.c:1317
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
void * arg
#define plan(x)
Definition: pg_regress.c:161
PyObject * PLy_cursor(PyObject *self, PyObject *args)
PyObject * PLy_cursor_plan(PyObject *ob, PyObject *args)
static PyMethodDef PLy_cursor_methods[]
static PyObject * PLy_cursor_query(const char *query)
static PyObject * PLy_cursor_fetch(PyObject *self, PyObject *args)
static PyObject * PLy_cursor_iternext(PyObject *self)
static PyTypeObject PLy_CursorType
static PyObject * PLy_cursor_close(PyObject *self, PyObject *unused)
static void PLy_cursor_dealloc(PyObject *arg)
void PLy_cursor_init_type(void)
static const char PLy_cursor_doc[]
struct PLyCursorObject PLyCursorObject
PyObject * PLy_exc_error
Definition: plpy_elog.c:15
void PLy_exception_set(PyObject *exc, const char *fmt,...)
Definition: plpy_elog.c:472
void PLy_exception_set_plural(PyObject *exc, const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: plpy_elog.c:486
PLyExecutionContext * PLy_current_execution_context(void)
Definition: plpy_main.c:365
PyObject * PLy_result_new(void)
void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:582
void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:591
void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner)
Definition: plpy_spi.c:574
PyObject * PLy_input_from_tuple(PLyDatumToOb *arg, HeapTuple tuple, TupleDesc desc, bool include_generated)
Definition: plpy_typeio.c:134
void PLy_input_setup_func(PLyDatumToOb *arg, MemoryContext arg_mcxt, Oid typeOid, int32 typmod, PLyProcedure *proc)
Definition: plpy_typeio.c:418
void PLy_input_setup_tuple(PLyDatumToOb *arg, TupleDesc desc, PLyProcedure *proc)
Definition: plpy_typeio.c:165
Datum PLy_output_convert(PLyObToDatum *arg, PyObject *val, bool *isnull)
Definition: plpy_typeio.c:120
char * PLyUnicode_AsString(PyObject *unicode)
Definition: plpy_util.c:82
#define PortalIsValid(p)
Definition: portal.h:212
void PinPortal(Portal portal)
Definition: portalmem.c:371
void UnpinPortal(Portal portal)
Definition: portalmem.c:380
Portal GetPortalByName(const char *name)
Definition: portalmem.c:130
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
static Pointer DatumGetPointer(Datum X)
Definition: postgres.h:312
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
uint64 SPI_processed
Definition: spi.c:44
int SPI_freeplan(SPIPlanPtr plan)
Definition: spi.c:1025
SPITupleTable * SPI_tuptable
Definition: spi.c:45
int SPI_result
Definition: spi.c:46
const char * SPI_result_code_string(int code)
Definition: spi.c:1972
void SPI_cursor_fetch(Portal portal, bool forward, long count)
Definition: spi.c:1806
void SPI_freetuptable(SPITupleTable *tuptable)
Definition: spi.c:1386
Portal SPI_cursor_open(const char *name, SPIPlanPtr plan, Datum *Values, const char *Nulls, bool read_only)
Definition: spi.c:1445
SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes)
Definition: spi.c:860
void SPI_cursor_close(Portal portal)
Definition: spi.c:1862
#define SPI_OK_FETCH
Definition: spi.h:84
PLyProcedure * curr_proc
Definition: plpy_main.h:20
PyObject_HEAD PyObject * nrows
const char * name
Definition: portal.h:118
TupleDesc tupdesc
Definition: spi.h:25
HeapTuple * vals
Definition: spi.h:26
Definition: type.h:138