PostgreSQL Source Code git master
parallel_slot.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * parallel_slot.c
4 * Parallel support for front-end parallel database connections
5 *
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * src/fe_utils/parallel_slot.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15#if defined(WIN32) && FD_SETSIZE < 1024
16#error FD_SETSIZE needs to have been increased
17#endif
18
19#include "postgres_fe.h"
20
21#include <sys/select.h>
22
23#include "common/logging.h"
24#include "fe_utils/cancel.h"
27
28#define ERRCODE_UNDEFINED_TABLE "42P01"
29
30static int select_loop(int maxFd, fd_set *workerset);
31static bool processQueryResult(ParallelSlot *slot, PGresult *result);
32
33/*
34 * Process (and delete) a query result. Returns true if there's no problem,
35 * false otherwise. It's up to the handler to decide what constitutes a
36 * problem.
37 */
38static bool
40{
41 Assert(slot->handler != NULL);
42
43 /* On failure, the handler should return NULL after freeing the result */
44 if (!slot->handler(result, slot->connection, slot->handler_context))
45 return false;
46
47 /* Ok, we have to free it ourself */
48 PQclear(result);
49 return true;
50}
51
52/*
53 * Consume all the results generated for the given connection until
54 * nothing remains. If at least one error is encountered, return false.
55 * Note that this will block if the connection is busy.
56 */
57static bool
59{
60 bool ok = true;
61 PGresult *result;
62
64 while ((result = PQgetResult(slot->connection)) != NULL)
65 {
66 if (!processQueryResult(slot, result))
67 ok = false;
68 }
70 return ok;
71}
72
73/*
74 * Wait until a file descriptor from the given set becomes readable.
75 *
76 * Returns the number of ready descriptors, or -1 on failure (including
77 * getting a cancel request).
78 */
79static int
80select_loop(int maxFd, fd_set *workerset)
81{
82 int i;
83 fd_set saveSet = *workerset;
84
86 return -1;
87
88 for (;;)
89 {
90 /*
91 * On Windows, we need to check once in a while for cancel requests;
92 * on other platforms we rely on select() returning when interrupted.
93 */
94 struct timeval *tvp;
95#ifdef WIN32
96 struct timeval tv = {0, 1000000};
97
98 tvp = &tv;
99#else
100 tvp = NULL;
101#endif
102
103 *workerset = saveSet;
104 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105
106#ifdef WIN32
107 if (i == SOCKET_ERROR)
108 {
109 i = -1;
110
111 if (WSAGetLastError() == WSAEINTR)
112 errno = EINTR;
113 }
114#endif
115
116 if (i < 0 && errno == EINTR)
117 continue; /* ignore this */
118 if (i < 0 || CancelRequested)
119 return -1; /* but not this */
120 if (i == 0)
121 continue; /* timeout (Win32 only) */
122 break;
123 }
124
125 return i;
126}
127
128/*
129 * Return the offset of a suitable idle slot, or -1 if none are available. If
130 * the given dbname is not null, only idle slots connected to the given
131 * database are considered suitable, otherwise all idle connected slots are
132 * considered suitable.
133 */
134static int
136{
137 int i;
138
139 for (i = 0; i < sa->numslots; i++)
140 {
141 if (sa->slots[i].inUse)
142 continue;
143
144 if (sa->slots[i].connection == NULL)
145 continue;
146
147 if (dbname == NULL ||
148 strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149 return i;
150 }
151 return -1;
152}
153
154/*
155 * Return the offset of the first slot without a database connection, or -1 if
156 * all slots are connected.
157 */
158static int
160{
161 int i;
162
163 for (i = 0; i < sa->numslots; i++)
164 {
165 if (sa->slots[i].inUse)
166 continue;
167
168 if (sa->slots[i].connection == NULL)
169 return i;
170 }
171
172 return -1;
173}
174
175/*
176 * Return the offset of the first idle slot, or -1 if all slots are busy.
177 */
178static int
180{
181 int i;
182
183 for (i = 0; i < sa->numslots; i++)
184 if (!sa->slots[i].inUse)
185 return i;
186
187 return -1;
188}
189
190/*
191 * Wait for any slot's connection to have query results, consume the results,
192 * and update the slot's status as appropriate. Returns true on success,
193 * false on cancellation, on error, or if no slots are connected.
194 */
195static bool
197{
198 int i;
199 fd_set slotset;
200 int maxFd = 0;
201 PGconn *cancelconn = NULL;
202
203 /* We must reconstruct the fd_set for each call to select_loop */
204 FD_ZERO(&slotset);
205
206 for (i = 0; i < sa->numslots; i++)
207 {
208 int sock;
209
210 /* We shouldn't get here if we still have slots without connections */
211 Assert(sa->slots[i].connection != NULL);
212
213 sock = PQsocket(sa->slots[i].connection);
214
215 /*
216 * We don't really expect any connections to lose their sockets after
217 * startup, but just in case, cope by ignoring them.
218 */
219 if (sock < 0)
220 continue;
221
222 /* Keep track of the first valid connection we see. */
223 if (cancelconn == NULL)
224 cancelconn = sa->slots[i].connection;
225
226 FD_SET(sock, &slotset);
227 if (sock > maxFd)
228 maxFd = sock;
229 }
230
231 /*
232 * If we get this far with no valid connections, processing cannot
233 * continue.
234 */
235 if (cancelconn == NULL)
236 return false;
237
238 SetCancelConn(cancelconn);
239 i = select_loop(maxFd, &slotset);
241
242 /* failure? */
243 if (i < 0)
244 return false;
245
246 for (i = 0; i < sa->numslots; i++)
247 {
248 int sock;
249
250 sock = PQsocket(sa->slots[i].connection);
251
252 if (sock >= 0 && FD_ISSET(sock, &slotset))
253 {
254 /* select() says input is available, so consume it */
255 PQconsumeInput(sa->slots[i].connection);
256 }
257
258 /* Collect result(s) as long as any are available */
259 while (!PQisBusy(sa->slots[i].connection))
260 {
261 PGresult *result = PQgetResult(sa->slots[i].connection);
262
263 if (result != NULL)
264 {
265 /* Handle and discard the command result */
266 if (!processQueryResult(&sa->slots[i], result))
267 return false;
268 }
269 else
270 {
271 /* This connection has become idle */
272 ParallelSlotSetIdle(&sa->slots[i]);
273 break;
274 }
275 }
276 }
277 return true;
278}
279
280/*
281 * Open a new database connection using the stored connection parameters and
282 * optionally a given dbname if not null, execute the stored initial command if
283 * any, and associate the new connection with the given slot.
284 */
285static void
286connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
287{
288 const char *old_override;
289 ParallelSlot *slot = &sa->slots[slotno];
290
291 old_override = sa->cparams->override_dbname;
292 if (dbname)
293 sa->cparams->override_dbname = dbname;
294 slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
295 sa->cparams->override_dbname = old_override;
296
297 /*
298 * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
299 * FD_SET() and allied macros. Windows defines it as a ceiling on the
300 * count of file descriptors in the set, not a ceiling on the value of
301 * each file descriptor; see
302 * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
303 * and
304 * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
305 * We can't ignore that, because Windows starts file descriptors at a
306 * higher value, delays reuse, and skips values. With less than ten
307 * concurrent file descriptors, opened and closed rapidly, one can reach
308 * file descriptor 1024.
309 *
310 * Doing a hard exit here is a bit grotty, but it doesn't seem worth
311 * complicating the API to make it less grotty.
312 */
313#ifdef WIN32
314 if (slotno >= FD_SETSIZE)
315 {
316 pg_log_error("too many jobs for this platform: %d", slotno);
317 exit(1);
318 }
319#else
320 {
321 int fd = PQsocket(slot->connection);
322
323 if (fd >= FD_SETSIZE)
324 {
325 pg_log_error("socket file descriptor out of range for select(): %d",
326 fd);
327 pg_log_error_hint("Try fewer jobs.");
328 exit(1);
329 }
330 }
331#endif
332
333 /* Setup the connection using the supplied command, if any. */
334 if (sa->initcmd)
335 executeCommand(slot->connection, sa->initcmd, sa->echo);
336}
337
338/*
339 * ParallelSlotsGetIdle
340 * Return a connection slot that is ready to execute a command.
341 *
342 * The slot returned is chosen as follows:
343 *
344 * If any idle slot already has an open connection, and if either dbname is
345 * null or the existing connection is to the given database, that slot will be
346 * returned allowing the connection to be reused.
347 *
348 * Otherwise, if any idle slot is not yet connected to any database, the slot
349 * will be returned with its connection opened using the stored cparams and
350 * optionally the given dbname if not null.
351 *
352 * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
353 * after having its connection disconnected and reconnected using the stored
354 * cparams and optionally the given dbname if not null.
355 *
356 * Otherwise, if any slots have connections that are busy, we loop on select()
357 * until one socket becomes available. When this happens, we read the whole
358 * set and mark as free all sockets that become available. We then select a
359 * slot using the same rules as above.
360 *
361 * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
362 *
363 * For any connection created, if the stored initcmd is not null, it will be
364 * executed as a command on the newly formed connection before the slot is
365 * returned.
366 *
367 * If an error occurs, NULL is returned.
368 */
371{
372 int offset;
373
374 Assert(sa);
375 Assert(sa->numslots > 0);
376
377 while (1)
378 {
379 /* First choice: a slot already connected to the desired database. */
381 if (offset >= 0)
382 {
383 sa->slots[offset].inUse = true;
384 return &sa->slots[offset];
385 }
386
387 /* Second choice: a slot not connected to any database. */
388 offset = find_unconnected_slot(sa);
389 if (offset >= 0)
390 {
391 connect_slot(sa, offset, dbname);
392 sa->slots[offset].inUse = true;
393 return &sa->slots[offset];
394 }
395
396 /* Third choice: a slot connected to the wrong database. */
397 offset = find_any_idle_slot(sa);
398 if (offset >= 0)
399 {
400 disconnectDatabase(sa->slots[offset].connection);
401 sa->slots[offset].connection = NULL;
402 connect_slot(sa, offset, dbname);
403 sa->slots[offset].inUse = true;
404 return &sa->slots[offset];
405 }
406
407 /*
408 * Fourth choice: block until one or more slots become available. If
409 * any slots hit a fatal error, we'll find out about that here and
410 * return NULL.
411 */
412 if (!wait_on_slots(sa))
413 return NULL;
414 }
415}
416
417/*
418 * ParallelSlotsSetup
419 * Prepare a set of parallel slots but do not connect to any database.
420 *
421 * This creates and initializes a set of slots, marking all parallel slots as
422 * free and ready to use. Establishing connections is delayed until requesting
423 * a free slot. The cparams, progname, echo, and initcmd are stored for later
424 * use and must remain valid for the lifetime of the returned array.
425 */
427ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
428 bool echo, const char *initcmd)
429{
431
432 Assert(numslots > 0);
433 Assert(cparams != NULL);
434 Assert(progname != NULL);
435
436 sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
437 numslots * sizeof(ParallelSlot));
438
439 sa->numslots = numslots;
440 sa->cparams = cparams;
441 sa->progname = progname;
442 sa->echo = echo;
443 sa->initcmd = initcmd;
444
445 return sa;
446}
447
448/*
449 * ParallelSlotsAdoptConn
450 * Assign an open connection to the slots array for reuse.
451 *
452 * This turns over ownership of an open connection to a slots array. The
453 * caller should not further use or close the connection. All the connection's
454 * parameters (user, host, port, etc.) except possibly dbname should match
455 * those of the slots array's cparams, as given in ParallelSlotsSetup. If
456 * these parameters differ, subsequent behavior is undefined.
457 */
458void
460{
461 int offset;
462
463 offset = find_unconnected_slot(sa);
464 if (offset >= 0)
465 sa->slots[offset].connection = conn;
466 else
468}
469
470/*
471 * ParallelSlotsTerminate
472 * Clean up a set of parallel slots
473 *
474 * Iterate through all connections in a given set of ParallelSlots and
475 * terminate all connections.
476 */
477void
479{
480 int i;
481
482 for (i = 0; i < sa->numslots; i++)
483 {
484 PGconn *conn = sa->slots[i].connection;
485
486 if (conn == NULL)
487 continue;
488
490 }
491}
492
493/*
494 * ParallelSlotsWaitCompletion
495 *
496 * Wait for all connections to finish, returning false if at least one
497 * error has been found on the way.
498 */
499bool
501{
502 int i;
503
504 for (i = 0; i < sa->numslots; i++)
505 {
506 if (sa->slots[i].connection == NULL)
507 continue;
508 if (!consumeQueryResult(&sa->slots[i]))
509 return false;
510 /* Mark connection as idle */
511 ParallelSlotSetIdle(&sa->slots[i]);
512 }
513
514 return true;
515}
516
517/*
518 * TableCommandResultHandler
519 *
520 * ParallelSlotResultHandler for results of commands (not queries) against
521 * tables.
522 *
523 * Requires that the result status is either PGRES_COMMAND_OK or an error about
524 * a missing table. This is useful for utilities that compile a list of tables
525 * to process and then run commands (vacuum, reindex, or whatever) against
526 * those tables, as there is a race condition between the time the list is
527 * compiled and the time the command attempts to open the table.
528 *
529 * For missing tables, logs an error but allows processing to continue.
530 *
531 * For all other errors, logs an error and terminates further processing.
532 *
533 * res: PGresult from the query executed on the slot's connection
534 * conn: connection belonging to the slot
535 * context: unused
536 */
537bool
539{
540 Assert(res != NULL);
541 Assert(conn != NULL);
542
543 /*
544 * If it's an error, report it. Errors about a missing table are harmless
545 * so we continue processing; but die for other errors.
546 */
548 {
549 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
550
551 pg_log_error("processing of database \"%s\" failed: %s",
553
554 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
555 {
556 PQclear(res);
557 return false;
558 }
559 }
560
561 return true;
562}
struct ParallelSlot ParallelSlot
Definition: parallel.h:52
volatile sig_atomic_t CancelRequested
Definition: cancel.c:59
void ResetCancelConn(void)
Definition: cancel.c:107
void SetCancelConn(PGconn *conn)
Definition: cancel.c:77
void disconnectDatabase(PGconn *conn)
PGconn * connectDatabase(const ConnParams *cparams, const char *progname, bool echo, bool fail_ok, bool allow_password_reuse)
Definition: connect_utils.c:32
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7538
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7704
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7730
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2001
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2048
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
#define PQgetResult
Definition: libpq-be-fe.h:246
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultErrorField
Definition: libpq-be-fe.h:249
#define PQresultStatus
Definition: libpq-be-fe.h:247
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
const char * progname
Definition: main.c:44
void * palloc0(Size size)
Definition: mcxt.c:1395
static bool wait_on_slots(ParallelSlotArray *sa)
ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel_slot.c:80
bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:28
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:58
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
Definition: parallel_slot.c:39
ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
void ParallelSlotsTerminate(ParallelSlotArray *sa)
static int find_unconnected_slot(const ParallelSlotArray *sa)
void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
static int find_any_idle_slot(const ParallelSlotArray *sa)
static void ParallelSlotSetIdle(ParallelSlot *slot)
Definition: parallel_slot.h:62
static void executeCommand(PGconn *conn, const char *query)
Definition: pg_dumpall.c:1786
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * dbname
Definition: streamutil.c:49
PGconn * conn
Definition: streamutil.c:52
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
PGconn * connection
Definition: parallel_slot.h:23
void * handler_context
Definition: parallel_slot.h:33
#define EINTR
Definition: win32_port.h:361
#define select(n, r, w, e, timeout)
Definition: win32_port.h:500