PostgreSQL Source Code  git master
libpq-be-fe-helpers.h
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * libpq-be-fe-helpers.h
4  * Helper functions for using libpq in extensions
5  *
6  * Code built directly into the backend is not allowed to link to libpq
7  * directly. Extension code is allowed to use libpq however. However, libpq
8  * used in extensions has to be careful not to block inside libpq, otherwise
9  * interrupts will not be processed, leading to issues like unresolvable
10  * deadlocks. Backend code also needs to take care to acquire/release an
11  * external fd for the connection, otherwise fd.c's accounting of fd's is
12  * broken.
13  *
14  * This file provides helper functions to make it easier to comply with these
15  * rules. It is a header only library as it needs to be linked into each
16  * extension using libpq, and it seems too small to be worth adding a
17  * dedicated static library for.
18  *
19  * TODO: For historical reasons the connections established here are not put
20  * into non-blocking mode. That can lead to blocking even when only the async
21  * libpq functions are used. This should be fixed.
22  *
23  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
24  * Portions Copyright (c) 1994, Regents of the University of California
25  *
26  * src/include/libpq/libpq-be-fe-helpers.h
27  *
28  *-------------------------------------------------------------------------
29  */
30 #ifndef LIBPQ_BE_FE_HELPERS_H
31 #define LIBPQ_BE_FE_HELPERS_H
32 
33 /*
34  * Despite the name, BUILDING_DLL is set only when building code directly part
35  * of the backend. Which also is where libpq isn't allowed to be
36  * used. Obviously this doesn't protect against libpq-fe.h getting included
37  * otherwise, but perhaps still protects against a few mistakes...
38  */
39 #ifdef BUILDING_DLL
40 #error "libpq may not be used code directly built into the backend"
41 #endif
42 
43 #include "libpq-fe.h"
44 #include "miscadmin.h"
45 #include "storage/fd.h"
46 #include "storage/latch.h"
47 #include "utils/timestamp.h"
48 #include "utils/wait_event.h"
49 
50 
51 static inline void libpqsrv_connect_prepare(void);
52 static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
53 static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
54 static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
55 
56 
57 /*
58  * PQconnectdb() wrapper that reserves a file descriptor and processes
59  * interrupts during connection establishment.
60  *
61  * Throws an error if AcquireExternalFD() fails, but does not throw if
62  * connection establishment itself fails. Callers need to use PQstatus() to
63  * check if connection establishment succeeded.
64  */
65 static inline PGconn *
66 libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
67 {
68  PGconn *conn = NULL;
69 
71 
72  conn = PQconnectStart(conninfo);
73 
74  libpqsrv_connect_internal(conn, wait_event_info);
75 
76  return conn;
77 }
78 
79 /*
80  * Like libpqsrv_connect(), except that this is a wrapper for
81  * PQconnectdbParams().
82  */
83 static inline PGconn *
84 libpqsrv_connect_params(const char *const *keywords,
85  const char *const *values,
86  int expand_dbname,
87  uint32 wait_event_info)
88 {
89  PGconn *conn = NULL;
90 
92 
93  conn = PQconnectStartParams(keywords, values, expand_dbname);
94 
95  libpqsrv_connect_internal(conn, wait_event_info);
96 
97  return conn;
98 }
99 
100 /*
101  * PQfinish() wrapper that additionally releases the reserved file descriptor.
102  *
103  * It is allowed to call this with a NULL pgconn iff NULL was returned by
104  * libpqsrv_connect*.
105  */
106 static inline void
108 {
109  /*
110  * If no connection was established, we haven't reserved an FD for it (or
111  * already released it). This rule makes it easier to write PG_CATCH()
112  * handlers for this facility's users.
113  *
114  * See also libpqsrv_connect_internal().
115  */
116  if (conn == NULL)
117  return;
118 
120  PQfinish(conn);
121 }
122 
123 
124 /* internal helper functions follow */
125 
126 
127 /*
128  * Helper function for all connection establishment functions.
129  */
130 static inline void
132 {
133  /*
134  * We must obey fd.c's limit on non-virtual file descriptors. Assume that
135  * a PGconn represents one long-lived FD. (Doing this here also ensures
136  * that VFDs are closed if needed to make room.)
137  */
138  if (!AcquireExternalFD())
139  {
140 #ifndef WIN32 /* can't write #if within ereport() macro */
141  ereport(ERROR,
142  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
143  errmsg("could not establish connection"),
144  errdetail("There are too many open files on the local server."),
145  errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
146 #else
147  ereport(ERROR,
148  (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
149  errmsg("could not establish connection"),
150  errdetail("There are too many open files on the local server."),
151  errhint("Raise the server's \"max_files_per_process\" setting.")));
152 #endif
153  }
154 }
155 
156 /*
157  * Helper function for all connection establishment functions.
158  */
159 static inline void
161 {
162  /*
163  * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
164  * that here.
165  */
166  if (conn == NULL)
167  {
169  return;
170  }
171 
172  /*
173  * Can't wait without a socket. Note that we don't want to close the libpq
174  * connection yet, so callers can emit a useful error.
175  */
176  if (PQstatus(conn) == CONNECTION_BAD)
177  return;
178 
179  /*
180  * WaitLatchOrSocket() can conceivably fail, handle that case here instead
181  * of requiring all callers to do so.
182  */
183  PG_TRY();
184  {
186 
187  /*
188  * Poll connection until we have OK or FAILED status.
189  *
190  * Per spec for PQconnectPoll, first wait till socket is write-ready.
191  */
192  status = PGRES_POLLING_WRITING;
193  while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
194  {
195  int io_flag;
196  int rc;
197 
198  if (status == PGRES_POLLING_READING)
199  io_flag = WL_SOCKET_READABLE;
200 #ifdef WIN32
201 
202  /*
203  * Windows needs a different test while waiting for
204  * connection-made
205  */
206  else if (PQstatus(conn) == CONNECTION_STARTED)
207  io_flag = WL_SOCKET_CONNECTED;
208 #endif
209  else
210  io_flag = WL_SOCKET_WRITEABLE;
211 
213  WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
214  PQsocket(conn),
215  0,
216  wait_event_info);
217 
218  /* Interrupted? */
219  if (rc & WL_LATCH_SET)
220  {
223  }
224 
225  /* If socket is ready, advance the libpq state machine */
226  if (rc & io_flag)
227  status = PQconnectPoll(conn);
228  }
229  }
230  PG_CATCH();
231  {
232  /*
233  * If an error is thrown here, the callers won't call
234  * libpqsrv_disconnect() with a conn, so release resources
235  * immediately.
236  */
238  PQfinish(conn);
239 
240  PG_RE_THROW();
241  }
242  PG_END_TRY();
243 }
244 
245 /*
246  * PQexec() wrapper that processes interrupts.
247  *
248  * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
249  * interrupts while pushing the query text to the server. Consider that
250  * setting if query strings can be long relative to TCP buffer size.
251  *
252  * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
253  * notably, PQexec() would silently discard any prior query results.
254  */
255 static inline PGresult *
256 libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
257 {
258  if (!PQsendQuery(conn, query))
259  return NULL;
260  return libpqsrv_get_result_last(conn, wait_event_info);
261 }
262 
263 /*
264  * PQexecParams() wrapper that processes interrupts.
265  *
266  * See notes at libpqsrv_exec().
267  */
268 static inline PGresult *
270  const char *command,
271  int nParams,
272  const Oid *paramTypes,
273  const char *const *paramValues,
274  const int *paramLengths,
275  const int *paramFormats,
276  int resultFormat,
277  uint32 wait_event_info)
278 {
279  if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
280  paramLengths, paramFormats, resultFormat))
281  return NULL;
282  return libpqsrv_get_result_last(conn, wait_event_info);
283 }
284 
285 /*
286  * Like PQexec(), loop over PQgetResult() until it returns NULL or another
287  * terminal state. Return the last non-NULL result or the terminal state.
288  */
289 static inline PGresult *
291 {
292  PGresult *volatile lastResult = NULL;
293 
294  /* In what follows, do not leak any PGresults on an error. */
295  PG_TRY();
296  {
297  for (;;)
298  {
299  /* Wait for, and collect, the next PGresult. */
300  PGresult *result;
301 
302  result = libpqsrv_get_result(conn, wait_event_info);
303  if (result == NULL)
304  break; /* query is complete, or failure */
305 
306  /*
307  * Emulate PQexec()'s behavior of returning the last result when
308  * there are many.
309  */
310  PQclear(lastResult);
311  lastResult = result;
312 
313  if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
314  PQresultStatus(lastResult) == PGRES_COPY_OUT ||
315  PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
317  break;
318  }
319  }
320  PG_CATCH();
321  {
322  PQclear(lastResult);
323  PG_RE_THROW();
324  }
325  PG_END_TRY();
326 
327  return lastResult;
328 }
329 
330 /*
331  * Perform the equivalent of PQgetResult(), but watch for interrupts.
332  */
333 static inline PGresult *
335 {
336  /*
337  * Collect data until PQgetResult is ready to get the result without
338  * blocking.
339  */
340  while (PQisBusy(conn))
341  {
342  int rc;
343 
347  PQsocket(conn),
348  0,
349  wait_event_info);
350 
351  /* Interrupted? */
352  if (rc & WL_LATCH_SET)
353  {
356  }
357 
358  /* Consume whatever data is available from the socket */
359  if (PQconsumeInput(conn) == 0)
360  {
361  /* trouble; expect PQgetResult() to return NULL */
362  break;
363  }
364  }
365 
366  /* Now we can collect and return the next PGresult */
367  return PQgetResult(conn);
368 }
369 
370 /*
371  * Submit a cancel request to the given connection, waiting only until
372  * the given time.
373  *
374  * We sleep interruptibly until we receive confirmation that the cancel
375  * request has been accepted, and if it is, return NULL; if the cancel
376  * request fails, return an error message string (which is not to be
377  * freed).
378  *
379  * For other problems (to wit: OOM when strdup'ing an error message from
380  * libpq), this function can ereport(ERROR).
381  *
382  * Note: this function leaks a string's worth of memory when reporting
383  * libpq errors. Make sure to call it in a transient memory context.
384  */
385 static inline const char *
387 {
388  PGcancelConn *cancel_conn;
389  const char *error = NULL;
390 
391  cancel_conn = PQcancelCreate(conn);
392  if (cancel_conn == NULL)
393  return "out of memory";
394 
395  /* In what follows, do not leak any PGcancelConn on any errors. */
396 
397  PG_TRY();
398  {
399  if (!PQcancelStart(cancel_conn))
400  {
401  error = pchomp(PQcancelErrorMessage(cancel_conn));
402  goto exit;
403  }
404 
405  for (;;)
406  {
409  long cur_timeout;
410  int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
411 
412  pollres = PQcancelPoll(cancel_conn);
413  if (pollres == PGRES_POLLING_OK)
414  break; /* success! */
415 
416  /* If timeout has expired, give up, else get sleep time. */
418  cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
419  if (cur_timeout <= 0)
420  {
421  error = "cancel request timed out";
422  break;
423  }
424 
425  switch (pollres)
426  {
428  waitEvents |= WL_SOCKET_READABLE;
429  break;
431  waitEvents |= WL_SOCKET_WRITEABLE;
432  break;
433  default:
434  error = pchomp(PQcancelErrorMessage(cancel_conn));
435  goto exit;
436  }
437 
438  /* Sleep until there's something to do */
439  WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
440  cur_timeout, PG_WAIT_CLIENT);
441 
443 
445  }
446 exit: ;
447  }
448  PG_FINALLY();
449  {
450  PQcancelFinish(cancel_conn);
451  }
452  PG_END_TRY();
453 
454  return error;
455 }
456 
457 #endif /* LIBPQ_BE_FE_HELPERS_H */
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
static Datum values[MAXATTR]
Definition: bootstrap.c:151
unsigned int uint32
Definition: c.h:492
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PG_RE_THROW()
Definition: elog.h:412
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define PG_FINALLY(...)
Definition: elog.h:388
#define ereport(elevel,...)
Definition: elog.h:149
void ReleaseExternalFD(void)
Definition: fd.c:1238
bool AcquireExternalFD(void)
Definition: fd.c:1185
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:65
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:306
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:207
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:334
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:294
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:185
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:792
PGconn * PQconnectStart(const char *conninfo)
Definition: fe-connect.c:873
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2597
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7137
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4879
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7226
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
struct Latch * MyLatch
Definition: globals.c:62
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:565
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_TIMEOUT
Definition: latch.h:130
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:132
#define WL_LATCH_SET
Definition: latch.h:127
#define WL_SOCKET_CONNECTED
Definition: latch.h:137
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static PGresult * libpqsrv_exec_params(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat, uint32 wait_event_info)
static void libpqsrv_connect_prepare(void)
static PGconn * libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
static void libpqsrv_disconnect(PGconn *conn)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
@ CONNECTION_STARTED
Definition: libpq-fe.h:89
@ CONNECTION_BAD
Definition: libpq-fe.h:82
@ PGRES_COPY_IN
Definition: libpq-fe.h:127
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:132
@ PGRES_COPY_OUT
Definition: libpq-fe.h:126
PostgresPollingStatusType
Definition: libpq-fe.h:109
@ PGRES_POLLING_OK
Definition: libpq-fe.h:113
@ PGRES_POLLING_READING
Definition: libpq-fe.h:111
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:112
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:110
exit(1)
char * pchomp(const char *in)
Definition: mcxt.c:1724
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
unsigned int Oid
Definition: postgres_ext.h:31
static void error(void)
Definition: sql-dyntest.c:147
PGconn * conn
Definition: streamutil.c:53
#define PG_WAIT_CLIENT
Definition: wait_event.h:22