PostgreSQL Source Code git master
Loading...
Searching...
No Matches
libpq-be-fe-helpers.h
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * libpq-be-fe-helpers.h
4 * Helper functions for using libpq in extensions
5 *
6 * Code built directly into the backend is not allowed to link to libpq
7 * directly. Extension code is allowed to use libpq however. However, libpq
8 * used in extensions has to be careful not to block inside libpq, otherwise
9 * interrupts will not be processed, leading to issues like unresolvable
10 * deadlocks. Backend code also needs to take care to acquire/release an
11 * external fd for the connection, otherwise fd.c's accounting of fd's is
12 * broken.
13 *
14 * This file provides helper functions to make it easier to comply with these
15 * rules. It is a header only library as it needs to be linked into each
16 * extension using libpq, and it seems too small to be worth adding a
17 * dedicated static library for.
18 *
19 * TODO: For historical reasons the connections established here are not put
20 * into non-blocking mode. That can lead to blocking even when only the async
21 * libpq functions are used. This should be fixed.
22 *
23 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
25 *
26 * src/include/libpq/libpq-be-fe-helpers.h
27 *
28 *-------------------------------------------------------------------------
29 */
30#ifndef LIBPQ_BE_FE_HELPERS_H
31#define LIBPQ_BE_FE_HELPERS_H
32
33#include "libpq/libpq-be-fe.h"
34#include "miscadmin.h"
35#include "storage/fd.h"
36#include "storage/latch.h"
37#include "utils/timestamp.h"
38#include "utils/wait_event.h"
39
40
41static inline void libpqsrv_connect_prepare(void);
42static inline void libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info);
43static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
44static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
45
46/*
47 * Start a connection using PQconnectStart().
48 *
49 * The returned connection has not yet completed its startup sequence. Callers
50 * may perform per-connection setup, such as installing a notice receiver,
51 * before calling libpqsrv_connect_complete().
52 *
53 * Callers must call libpqsrv_connect_complete(), even if this function returns
54 * NULL, because libpqsrv_connect_prepare() may already have reserved an
55 * external FD that must be released.
56 */
57static inline PGconn *
58libpqsrv_connect_start(const char *conninfo)
59{
61
62 return PQconnectStart(conninfo);
63}
64
65/*
66 * PQconnectdb() wrapper that reserves a file descriptor and processes
67 * interrupts during connection establishment.
68 *
69 * Throws an error if AcquireExternalFD() fails, but does not throw if
70 * connection establishment itself fails. Callers need to use PQstatus() to
71 * check if connection establishment succeeded.
72 */
73static inline PGconn *
74libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
75{
76 PGconn *conn;
77
78 conn = libpqsrv_connect_start(conninfo);
79
80 libpqsrv_connect_complete(conn, wait_event_info);
81
82 return conn;
83}
84
85/*
86 * Start a connection using PQconnectStartParams().
87 *
88 * See libpqsrv_connect_start() for the resource-lifetime rules.
89 */
90static inline PGconn *
92 const char *const *values,
93 int expand_dbname)
94{
96
98}
99
100/*
101 * Like libpqsrv_connect(), except that this is a wrapper for
102 * PQconnectdbParams().
103 */
104static inline PGconn *
106 const char *const *values,
107 int expand_dbname,
108 uint32 wait_event_info)
109{
110 PGconn *conn;
111
113
114 libpqsrv_connect_complete(conn, wait_event_info);
115
116 return conn;
117}
118
119/*
120 * PQfinish() wrapper that additionally releases the reserved file descriptor.
121 *
122 * It is allowed to call this with NULL only when the external FD reservation
123 * has already been released, for example after calling
124 * libpqsrv_connect_complete() with a NULL connection.
125 */
126static inline void
128{
129 /*
130 * If no connection was established, we haven't reserved an FD for it (or
131 * already released it). This rule makes it easier to write PG_CATCH()
132 * handlers for this facility's users.
133 *
134 * See also libpqsrv_connect_complete().
135 */
136 if (conn == NULL)
137 return;
138
140 PQfinish(conn);
141}
142
143
144/* lower-level connection helper functions follow */
145
146
147/*
148 * Helper function for all connection establishment functions.
149 */
150static inline void
152{
153 /*
154 * We must obey fd.c's limit on non-virtual file descriptors. Assume that
155 * a PGconn represents one long-lived FD. (Doing this here also ensures
156 * that VFDs are closed if needed to make room.)
157 */
158 if (!AcquireExternalFD())
159 {
160#ifndef WIN32 /* can't write #if within ereport() macro */
163 errmsg("could not establish connection"),
164 errdetail("There are too many open files on the local server."),
165 errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
166#else
169 errmsg("could not establish connection"),
170 errdetail("There are too many open files on the local server."),
171 errhint("Raise the server's \"max_files_per_process\" setting.")));
172#endif
173 }
174}
175
176/*
177 * Complete a connection started by libpqsrv_connect_start() or
178 * libpqsrv_connect_params_start().
179 */
180static inline void
182{
183 /*
184 * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
185 * that here.
186 */
187 if (conn == NULL)
188 {
190 return;
191 }
192
193 /*
194 * Can't wait without a socket. Note that we don't want to close the libpq
195 * connection yet, so callers can emit a useful error.
196 */
198 return;
199
200 /*
201 * WaitLatchOrSocket() can conceivably fail, handle that case here instead
202 * of requiring all callers to do so.
203 */
204 PG_TRY();
205 {
207
208 /*
209 * Poll connection until we have OK or FAILED status.
210 *
211 * Per spec for PQconnectPoll, first wait till socket is write-ready.
212 */
213 status = PGRES_POLLING_WRITING;
214 while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
215 {
216 int io_flag;
217 int rc;
218
219 if (status == PGRES_POLLING_READING)
221#ifdef WIN32
222
223 /*
224 * Windows needs a different test while waiting for
225 * connection-made
226 */
227 else if (PQstatus(conn) == CONNECTION_STARTED)
229#endif
230 else
232
235 PQsocket(conn),
236 0,
237 wait_event_info);
238
239 /* Interrupted? */
240 if (rc & WL_LATCH_SET)
241 {
244 }
245
246 /* If socket is ready, advance the libpq state machine */
247 if (rc & io_flag)
248 status = PQconnectPoll(conn);
249 }
250 }
251 PG_CATCH();
252 {
253 /*
254 * If an error is thrown here, the callers won't call
255 * libpqsrv_disconnect() with a conn, so release resources
256 * immediately.
257 */
259 PQfinish(conn);
260
261 PG_RE_THROW();
262 }
263 PG_END_TRY();
264}
265
266/*
267 * PQexec() wrapper that processes interrupts.
268 *
269 * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
270 * interrupts while pushing the query text to the server. Consider that
271 * setting if query strings can be long relative to TCP buffer size.
272 *
273 * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
274 * notably, PQexec() would silently discard any prior query results.
275 */
276static inline PGresult *
277libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
278{
279 if (!PQsendQuery(conn, query))
280 return NULL;
281 return libpqsrv_get_result_last(conn, wait_event_info);
282}
283
284/*
285 * PQexecParams() wrapper that processes interrupts.
286 *
287 * See notes at libpqsrv_exec().
288 */
289static inline PGresult *
291 const char *command,
292 int nParams,
293 const Oid *paramTypes,
294 const char *const *paramValues,
295 const int *paramLengths,
296 const int *paramFormats,
297 int resultFormat,
298 uint32 wait_event_info)
299{
300 if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
302 return NULL;
303 return libpqsrv_get_result_last(conn, wait_event_info);
304}
305
306/*
307 * Like PQexec(), loop over PQgetResult() until it returns NULL or another
308 * terminal state. Return the last non-NULL result or the terminal state.
309 */
310static inline PGresult *
312{
314
315 for (;;)
316 {
317 /* Wait for, and collect, the next PGresult. */
319
320 result = libpqsrv_get_result(conn, wait_event_info);
321 if (result == NULL)
322 break; /* query is complete, or failure */
323
324 /*
325 * Emulate PQexec()'s behavior of returning the last result when there
326 * are many.
327 */
330
335 break;
336 }
337 return lastResult;
338}
339
340/*
341 * Perform the equivalent of PQgetResult(), but watch for interrupts.
342 */
343static inline PGresult *
345{
346 /*
347 * Collect data until PQgetResult is ready to get the result without
348 * blocking.
349 */
350 while (PQisBusy(conn))
351 {
352 int rc;
353
357 PQsocket(conn),
358 0,
359 wait_event_info);
360
361 /* Interrupted? */
362 if (rc & WL_LATCH_SET)
363 {
366 }
367
368 /* Consume whatever data is available from the socket */
369 if (PQconsumeInput(conn) == 0)
370 {
371 /* trouble; expect PQgetResult() to return NULL */
372 break;
373 }
374 }
375
376 /* Now we can collect and return the next PGresult */
377 return PQgetResult(conn);
378}
379
380/*
381 * Submit a cancel request to the given connection, waiting only until
382 * the given time.
383 *
384 * We sleep interruptibly until we receive confirmation that the cancel
385 * request has been accepted, and if it is, return NULL; if the cancel
386 * request fails, return an error message string (which is not to be
387 * freed).
388 *
389 * For other problems (to wit: OOM when strdup'ing an error message from
390 * libpq), this function can ereport(ERROR).
391 *
392 * Note: this function leaks a string's worth of memory when reporting
393 * libpq errors. Make sure to call it in a transient memory context.
394 */
395static inline const char *
397{
399 const char *error = NULL;
400
402 if (cancel_conn == NULL)
403 return "out of memory";
404
405 /* In what follows, do not leak any PGcancelConn on any errors. */
406
407 PG_TRY();
408 {
410 {
412 goto exit;
413 }
414
415 for (;;)
416 {
419 long cur_timeout;
421
424 break; /* success! */
425
426 /* If timeout has expired, give up, else get sleep time. */
429 if (cur_timeout <= 0)
430 {
431 error = "cancel request timed out";
432 break;
433 }
434
435 switch (pollres)
436 {
439 break;
442 break;
443 default:
445 goto exit;
446 }
447
448 /* Sleep until there's something to do */
451
453
455 }
456exit: ;
457 }
458 PG_FINALLY();
459 {
461 }
462 PG_END_TRY();
463
464 return error;
465}
466
467/*
468 * libpqsrv_notice_receiver
469 *
470 * Custom notice receiver for libpq connections.
471 *
472 * This function is intended to be set via PQsetNoticeReceiver() so that
473 * NOTICE, WARNING, and similar messages from the connection are reported via
474 * ereport(), instead of being printed to stderr.
475 *
476 * Because this will be called from libpq with a "real" (not wrapped)
477 * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
478 * for PGresult and also PQresultErrorMessage, and put back the wrappers
479 * afterwards. That's not pretty, but there seems no better alternative.
480 */
481#undef PGresult
482#undef PQresultErrorMessage
483
484static inline void
486{
487 const char *message;
488 int len;
489 const char *prefix = (const char *) arg;
490
491 /*
492 * Trim the trailing newline from the message text returned from
493 * PQresultErrorMessage(), as it always includes one, to produce cleaner
494 * log output.
495 */
496 message = PQresultErrorMessage(res);
497 len = strlen(message);
498 if (len > 0 && message[len - 1] == '\n')
499 len--;
500
501 ereport(LOG,
502 errmsg_internal("%s: %.*s", prefix, len, message));
503}
504
505#define PGresult libpqsrv_PGresult
506#define PQresultErrorMessage libpqsrv_PQresultErrorMessage
507
508#endif /* LIBPQ_BE_FE_HELPERS_H */
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1765
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1649
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1613
static Datum values[MAXATTR]
Definition bootstrap.c:190
uint32_t uint32
Definition c.h:624
uint32 result
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1323
int errcode(int sqlerrcode)
Definition elog.c:875
#define LOG
Definition elog.h:32
#define PG_RE_THROW()
Definition elog.h:407
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define PG_TRY(...)
Definition elog.h:374
#define PG_END_TRY(...)
Definition elog.h:399
#define ERROR
Definition elog.h:40
#define PG_CATCH(...)
Definition elog.h:384
#define PG_FINALLY(...)
Definition elog.h:391
#define ereport(elevel,...)
Definition elog.h:152
void ReleaseExternalFD(void)
Definition fd.c:1225
bool AcquireExternalFD(void)
Definition fd.c:1172
PGcancelConn * PQcancelCreate(PGconn *conn)
Definition fe-cancel.c:68
PostgresPollingStatusType PQcancelPoll(PGcancelConn *cancelConn)
Definition fe-cancel.c:226
void PQcancelFinish(PGcancelConn *cancelConn)
Definition fe-cancel.c:353
int PQcancelSocket(const PGcancelConn *cancelConn)
Definition fe-cancel.c:313
char * PQcancelErrorMessage(const PGcancelConn *cancelConn)
Definition fe-cancel.c:325
int PQcancelStart(PGcancelConn *cancelConn)
Definition fe-cancel.c:204
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
PGconn * PQconnectStart(const char *conninfo)
Definition fe-connect.c:958
PGconn * PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname)
Definition fe-connect.c:877
void PQfinish(PGconn *conn)
int PQsocket(const PGconn *conn)
int PQsendQueryParams(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat)
Definition fe-exec.c:1509
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQisBusy(PGconn *conn)
Definition fe-exec.c:2048
struct Latch * MyLatch
Definition globals.c:65
static const JsonPathKeyword keywords[]
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
Definition latch.c:223
void ResetLatch(Latch *latch)
Definition latch.c:374
static const char * libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
static PGresult * libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
static PGconn * libpqsrv_connect_params(const char *const *keywords, const char *const *values, int expand_dbname, uint32 wait_event_info)
static void libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info)
static PGconn * libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
static void libpqsrv_connect_prepare(void)
static PGresult * libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
static void libpqsrv_notice_receiver(void *arg, const PGresult *res)
static PGconn * libpqsrv_connect_start(const char *conninfo)
static void libpqsrv_disconnect(PGconn *conn)
#define PQresultErrorMessage
static PGresult * libpqsrv_exec_params(PGconn *conn, const char *command, int nParams, const Oid *paramTypes, const char *const *paramValues, const int *paramLengths, const int *paramFormats, int resultFormat, uint32 wait_event_info)
static PGresult * libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
static PGconn * libpqsrv_connect_params_start(const char *const *keywords, const char *const *values, int expand_dbname)
#define PQgetResult
#define PQclear
#define PQresultStatus
@ CONNECTION_STARTED
Definition libpq-fe.h:98
@ CONNECTION_BAD
Definition libpq-fe.h:91
@ PGRES_COPY_IN
Definition libpq-fe.h:138
@ PGRES_COPY_BOTH
Definition libpq-fe.h:143
@ PGRES_COPY_OUT
Definition libpq-fe.h:137
PostgresPollingStatusType
Definition libpq-fe.h:120
@ PGRES_POLLING_OK
Definition libpq-fe.h:124
@ PGRES_POLLING_READING
Definition libpq-fe.h:122
@ PGRES_POLLING_WRITING
Definition libpq-fe.h:123
@ PGRES_POLLING_FAILED
Definition libpq-fe.h:121
char * pchomp(const char *in)
Definition mcxt.c:1938
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
static char * errmsg
const void size_t len
unsigned int Oid
static int fb(int x)
static void error(void)
PGconn * conn
Definition streamutil.c:52
#define PG_WAIT_CLIENT
#define WL_SOCKET_READABLE
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define WL_SOCKET_CONNECTED
#define WL_SOCKET_WRITEABLE