PostgreSQL Source Code git master
libpq-be-fe-helpers.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq-be-fe-helpers.h
4 * Helper functions for using libpq in extensions
5 *
6 * Code built directly into the backend is not allowed to link to libpq
7 * directly. Extension code is allowed to use libpq however. However, libpq
8 * used in extensions has to be careful not to block inside libpq, otherwise
9 * interrupts will not be processed, leading to issues like unresolvable
10 * deadlocks. Backend code also needs to take care to acquire/release an
11 * external fd for the connection, otherwise fd.c's accounting of fd's is
12 * broken.
13 *
14 * This file provides helper functions to make it easier to comply with these
15 * rules. It is a header only library as it needs to be linked into each
16 * extension using libpq, and it seems too small to be worth adding a
17 * dedicated static library for.
18 *
19 * TODO: For historical reasons the connections established here are not put
20 * into non-blocking mode. That can lead to blocking even when only the async
21 * libpq functions are used. This should be fixed.
22 *
23 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * src/include/libpq/libpq-be-fe-helpers.h
27 *
28 *-------------------------------------------------------------------------
29 */
30#ifndef LIBPQ_BE_FE_HELPERS_H
31#define LIBPQ_BE_FE_HELPERS_H
32
33/*
34 * Despite the name, BUILDING_DLL is set only when building code directly part
35 * of the backend. Which also is where libpq isn't allowed to be
36 * used. Obviously this doesn't protect against libpq-fe.h getting included
37 * otherwise, but perhaps still protects against a few mistakes...
38 */
39#ifdef BUILDING_DLL
40#error "libpq may not be used code directly built into the backend"
41#endif
42
43#include "libpq-fe.h"
44#include "miscadmin.h"
45#include "storage/fd.h"
46#include "storage/latch.h"
47#include "utils/timestamp.h"
48#include "utils/wait_event.h"
49
50
51static inline void libpqsrv_connect_prepare(void);
52static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
53static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
54static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
55
56
57/*
58 * PQconnectdb() wrapper that reserves a file descriptor and processes
59 * interrupts during connection establishment.
60 *
61 * Throws an error if AcquireExternalFD() fails, but does not throw if
62 * connection establishment itself fails. Callers need to use PQstatus() to
63 * check if connection establishment succeeded.
64 */
65static inline PGconn *
66libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
67{
68 PGconn *conn = NULL;
69
71
72 conn = PQconnectStart(conninfo);
73
74 libpqsrv_connect_internal(conn, wait_event_info);
75
76 return conn;
77}
78
79/*
80 * Like libpqsrv_connect(), except that this is a wrapper for
81 * PQconnectdbParams().
82 */
83static inline PGconn *
85 const char *const *values,
86 int expand_dbname,
87 uint32 wait_event_info)
88{
89 PGconn *conn = NULL;
90
92
93 conn = PQconnectStartParams(keywords, values, expand_dbname);
94
95 libpqsrv_connect_internal(conn, wait_event_info);
96
97 return conn;
98}
99
100/*
101 * PQfinish() wrapper that additionally releases the reserved file descriptor.
102 *
103 * It is allowed to call this with a NULL pgconn iff NULL was returned by
104 * libpqsrv_connect*.
105 */
106static inline void
108{
109 /*
110 * If no connection was established, we haven't reserved an FD for it (or
111 * already released it). This rule makes it easier to write PG_CATCH()
112 * handlers for this facility's users.
113 *
114 * See also libpqsrv_connect_internal().
115 */
116 if (conn == NULL)
117 return;
118
120 PQfinish(conn);
121}
122
123
124/* internal helper functions follow */
125
126
127/*
128 * Helper function for all connection establishment functions.
129 */
130static inline void
132{
133 /*
134 * We must obey fd.c's limit on non-virtual file descriptors. Assume that
135 * a PGconn represents one long-lived FD. (Doing this here also ensures
136 * that VFDs are closed if needed to make room.)
137 */
138 if (!AcquireExternalFD())
139 {
140#ifndef WIN32 /* can't write #if within ereport() macro */
142 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
143 errmsg("could not establish connection"),
144 errdetail("There are too many open files on the local server."),
145 errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
146#else
148 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
149 errmsg("could not establish connection"),
150 errdetail("There are too many open files on the local server."),
151 errhint("Raise the server's \"max_files_per_process\" setting.")));
152#endif
153 }
154}
155
156/*
157 * Helper function for all connection establishment functions.
158 */
159static inline void
161{
162 /*
163 * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
164 * that here.
165 */
166 if (conn == NULL)
167 {
169 return;
170 }
171
172 /*
173 * Can't wait without a socket. Note that we don't want to close the libpq
174 * connection yet, so callers can emit a useful error.
175 */
177 return;
178
179 /*
180 * WaitLatchOrSocket() can conceivably fail, handle that case here instead
181 * of requiring all callers to do so.
182 */
183 PG_TRY();
184 {
186
187 /*
188 * Poll connection until we have OK or FAILED status.
189 *
190 * Per spec for PQconnectPoll, first wait till socket is write-ready.
191 */
192 status = PGRES_POLLING_WRITING;
193 while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
194 {
195 int io_flag;
196 int rc;
197
198 if (status == PGRES_POLLING_READING)
199 io_flag = WL_SOCKET_READABLE;
200#ifdef WIN32
201
202 /*
203 * Windows needs a different test while waiting for
204 * connection-made
205 */
206 else if (PQstatus(conn) == CONNECTION_STARTED)
207 io_flag = WL_SOCKET_CONNECTED;
208#endif
209 else
210 io_flag = WL_SOCKET_WRITEABLE;
211
214 PQsocket(conn),
215 0,
216 wait_event_info);
217
218 /* Interrupted? */
219 if (rc & WL_LATCH_SET)
220 {
223 }
224
225 /* If socket is ready, advance the libpq state machine */
226 if (rc & io_flag)
227 status = PQconnectPoll(conn);
228 }
229 }
230 PG_CATCH();
231 {
232 /*
233 * If an error is thrown here, the callers won't call
234 * libpqsrv_disconnect() with a conn, so release resources
235 * immediately.
236 */
238 PQfinish(conn);
239
240 PG_RE_THROW();
241 }
242 PG_END_TRY();
243}
244
245/*
246 * PQexec() wrapper that processes interrupts.
247 *
248 * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
249 * interrupts while pushing the query text to the server. Consider that
250 * setting if query strings can be long relative to TCP buffer size.
251 *
252 * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
253 * notably, PQexec() would silently discard any prior query results.
254 */
255static inline PGresult *
256libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
257{
258 if (!PQsendQuery(conn, query))
259 return NULL;
260 return libpqsrv_get_result_last(conn, wait_event_info);
261}
262
263/*
264 * PQexecParams() wrapper that processes interrupts.
265 *
266 * See notes at libpqsrv_exec().
267 */
268static inline PGresult *
270 const char *command,
271 int nParams,
272 const Oid *paramTypes,
273 const char *const *paramValues,
274 const int *paramLengths,
275 const int *paramFormats,
276 int resultFormat,
277 uint32 wait_event_info)
278{
279 if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
280 paramLengths, paramFormats, resultFormat))
281 return NULL;
282 return libpqsrv_get_result_last(conn, wait_event_info);
283}
284
285/*
286 * Like PQexec(), loop over PQgetResult() until it returns NULL or another
287 * terminal state. Return the last non-NULL result or the terminal state.
288 */
289static inline PGresult *
291{
292 PGresult *volatile lastResult = NULL;
293
294 /* In what follows, do not leak any PGresults on an error. */
295 PG_TRY();
296 {
297 for (;;)
298 {
299 /* Wait for, and collect, the next PGresult. */
300 PGresult *result;
301
302 result = libpqsrv_get_result(conn, wait_event_info);
303 if (result == NULL)
304 break; /* query is complete, or failure */
305
306 /*
307 * Emulate PQexec()'s behavior of returning the last result when
308 * there are many.
309 */
310 PQclear(lastResult);
311 lastResult = result;
312
313 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
314 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
315 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
317 break;
318 }
319 }
320 PG_CATCH();
321 {
322 PQclear(lastResult);
323 PG_RE_THROW();
324 }
325 PG_END_TRY();
326
327 return lastResult;
328}
329
330/*
331 * Perform the equivalent of PQgetResult(), but watch for interrupts.
332 */
333static inline PGresult *
335{
336 /*
337 * Collect data until PQgetResult is ready to get the result without
338 * blocking.
339 */
340 while (PQisBusy(conn))
341 {
342 int rc;
343
347 PQsocket(conn),
348 0,
349 wait_event_info);
350
351 /* Interrupted? */
352 if (rc & WL_LATCH_SET)
353 {
356 }
357
358 /* Consume whatever data is available from the socket */
359 if (PQconsumeInput(conn) == 0)
360 {
361 /* trouble; expect PQgetResult() to return NULL */
362 break;
363 }
364 }
365
366 /* Now we can collect and return the next PGresult */
367 return PQgetResult(conn);
368}
369
370/*
371 * Submit a cancel request to the given connection, waiting only until
372 * the given time.
373 *
374 * We sleep interruptibly until we receive confirmation that the cancel
375 * request has been accepted, and if it is, return NULL; if the cancel
376 * request fails, return an error message string (which is not to be
377 * freed).
378 *
379 * For other problems (to wit: OOM when strdup'ing an error message from
380 * libpq), this function can ereport(ERROR).
381 *
382 * Note: this function leaks a string's worth of memory when reporting
383 * libpq errors. Make sure to call it in a transient memory context.
384 */
385static inline const char *
387{
388 PGcancelConn *cancel_conn;
389 const char *error = NULL;
390
391 cancel_conn = PQcancelCreate(conn);
392 if (cancel_conn == NULL)
393 return "out of memory";
394
395 /* In what follows, do not leak any PGcancelConn on any errors. */
396
397 PG_TRY();
398 {
399 if (!PQcancelStart(cancel_conn))
400 {
401 error = pchomp(PQcancelErrorMessage(cancel_conn));
402 goto exit;
403 }
404
405 for (;;)
406 {
409 long cur_timeout;
410 int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
411
412 pollres = PQcancelPoll(cancel_conn);
413 if (pollres == PGRES_POLLING_OK)
414 break; /* success! */
415
416 /* If timeout has expired, give up, else get sleep time. */
418 cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
419 if (cur_timeout <= 0)
420 {
421 error = "cancel request timed out";
422 break;
423 }
424
425 switch (pollres)
426 {
428 waitEvents |= WL_SOCKET_READABLE;
429 break;
431 waitEvents |= WL_SOCKET_WRITEABLE;
432 break;
433 default:
434 error = pchomp(PQcancelErrorMessage(cancel_conn));
435 goto exit;
436 }
437
438 /* Sleep until there's something to do */
439 WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
440 cur_timeout, PG_WAIT_CLIENT);
441
443
445 }
446exit: ;
447 }
448 PG_FINALLY();
449 {
450 PQcancelFinish(cancel_conn);
451 }
452 PG_END_TRY();
453
454 return error;
455}
456
457#endif /* LIBPQ_BE_FE_HELPERS_H */
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
static Datum values[MAXATTR]
Definition: bootstrap.c:151
uint32_t uint32
Definition: c.h:502
int64 TimestampTz
Definition: timestamp.h:39
int errdetail(const char *fmt,...)
Definition: elog.c:1203
int errhint(const char *fmt,...)
Definition: elog.c:1317
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define PG_RE_THROW()
Definition: elog.h:404
#define PG_TRY(...)
Definition: elog.h:371
#define PG_END_TRY(...)
Definition: elog.h:396
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:381
#define PG_FINALLY(...)
Definition: elog.h:388
#define ereport(elevel,...)
Definition: elog.h:149
void ReleaseExternalFD(void)
Definition: fd.c:1238
bool AcquireExternalFD(void)
Definition: fd.c:1185
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition: fe-cancel.c:65
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition: fe-cancel.c:207
void PQcancelFinish(PGcancelConn *cancelConn)
Definition: fe-cancel.c:334
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:294
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition: fe-cancel.c:306
int PQcancelStart(PGcancelConn *cancelConn)
Definition: fe-cancel.c:185
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2842
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7490
PGconn * PQconnectStart(const char *conninfo)
Definition: fe-connect.c:920
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition: fe-connect.c:839
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5224
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7579
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition: fe-exec.c:1492
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
void PQclear(PGresult *res)
Definition: fe-exec.c:721
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
struct Latch * MyLatch
Definition: globals.c:62
static const JsonPathKeyword keywords[]
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition: latch.c:221
void ResetLatch(Latch *latch)
Definition: latch.c:372
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static PGconn * libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
static void libpqsrv_connect_prepare(void)
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_disconnect(PGconn *conn)
static PGresult * libpqsrv_exec_params(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat, uint32 wait_event_info)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
@ CONNECTION_STARTED
Definition: libpq-fe.h:91
@ CONNECTION_BAD
Definition: libpq-fe.h:84
@ PGRES_COPY_IN
Definition: libpq-fe.h:131
@ PGRES_COPY_BOTH
Definition: libpq-fe.h:136
@ PGRES_COPY_OUT
Definition: libpq-fe.h:130
PostgresPollingStatusType
Definition: libpq-fe.h:113
@ PGRES_POLLING_OK
Definition: libpq-fe.h:117
@ PGRES_POLLING_READING
Definition: libpq-fe.h:115
@ PGRES_POLLING_WRITING
Definition: libpq-fe.h:116
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:114
char * pchomp(const char *in)
Definition: mcxt.c:1727
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
unsigned int Oid
Definition: postgres_ext.h:32
static void error(void)
Definition: sql-dyntest.c:147
PGconn * conn
Definition: streamutil.c:52
#define PG_WAIT_CLIENT
Definition: wait_event.h:22
#define WL_SOCKET_READABLE
Definition: waiteventset.h:35
#define WL_TIMEOUT
Definition: waiteventset.h:37
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define WL_SOCKET_CONNECTED
Definition: waiteventset.h:44
#define WL_SOCKET_WRITEABLE
Definition: waiteventset.h:36