PostgreSQL Source Code  git master
parallel_slot.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * parallel_slot.c
4  * Parallel support for front-end parallel database connections
5  *
6  *
7  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/fe_utils/parallel_slot.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #if defined(WIN32) && FD_SETSIZE < 1024
16 #error FD_SETSIZE needs to have been increased
17 #endif
18 
19 #include "postgres_fe.h"
20 
21 #include <sys/select.h>
22 
23 #include "common/logging.h"
24 #include "fe_utils/cancel.h"
25 #include "fe_utils/parallel_slot.h"
26 #include "fe_utils/query_utils.h"
27 
28 #define ERRCODE_UNDEFINED_TABLE "42P01"
29 
30 static int select_loop(int maxFd, fd_set *workerset);
31 static bool processQueryResult(ParallelSlot *slot, PGresult *result);
32 
33 /*
34  * Process (and delete) a query result. Returns true if there's no problem,
35  * false otherwise. It's up to the handler to decide what constitutes a
36  * problem.
37  */
38 static bool
40 {
41  Assert(slot->handler != NULL);
42 
43  /* On failure, the handler should return NULL after freeing the result */
44  if (!slot->handler(result, slot->connection, slot->handler_context))
45  return false;
46 
47  /* Ok, we have to free it ourself */
48  PQclear(result);
49  return true;
50 }
51 
52 /*
53  * Consume all the results generated for the given connection until
54  * nothing remains. If at least one error is encountered, return false.
55  * Note that this will block if the connection is busy.
56  */
57 static bool
59 {
60  bool ok = true;
61  PGresult *result;
62 
64  while ((result = PQgetResult(slot->connection)) != NULL)
65  {
66  if (!processQueryResult(slot, result))
67  ok = false;
68  }
70  return ok;
71 }
72 
73 /*
74  * Wait until a file descriptor from the given set becomes readable.
75  *
76  * Returns the number of ready descriptors, or -1 on failure (including
77  * getting a cancel request).
78  */
79 static int
80 select_loop(int maxFd, fd_set *workerset)
81 {
82  int i;
83  fd_set saveSet = *workerset;
84 
85  if (CancelRequested)
86  return -1;
87 
88  for (;;)
89  {
90  /*
91  * On Windows, we need to check once in a while for cancel requests;
92  * on other platforms we rely on select() returning when interrupted.
93  */
94  struct timeval *tvp;
95 #ifdef WIN32
96  struct timeval tv = {0, 1000000};
97 
98  tvp = &tv;
99 #else
100  tvp = NULL;
101 #endif
102 
103  *workerset = saveSet;
104  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105 
106 #ifdef WIN32
107  if (i == SOCKET_ERROR)
108  {
109  i = -1;
110 
111  if (WSAGetLastError() == WSAEINTR)
112  errno = EINTR;
113  }
114 #endif
115 
116  if (i < 0 && errno == EINTR)
117  continue; /* ignore this */
118  if (i < 0 || CancelRequested)
119  return -1; /* but not this */
120  if (i == 0)
121  continue; /* timeout (Win32 only) */
122  break;
123  }
124 
125  return i;
126 }
127 
128 /*
129  * Return the offset of a suitable idle slot, or -1 if none are available. If
130  * the given dbname is not null, only idle slots connected to the given
131  * database are considered suitable, otherwise all idle connected slots are
132  * considered suitable.
133  */
134 static int
136 {
137  int i;
138 
139  for (i = 0; i < sa->numslots; i++)
140  {
141  if (sa->slots[i].inUse)
142  continue;
143 
144  if (sa->slots[i].connection == NULL)
145  continue;
146 
147  if (dbname == NULL ||
148  strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149  return i;
150  }
151  return -1;
152 }
153 
154 /*
155  * Return the offset of the first slot without a database connection, or -1 if
156  * all slots are connected.
157  */
158 static int
160 {
161  int i;
162 
163  for (i = 0; i < sa->numslots; i++)
164  {
165  if (sa->slots[i].inUse)
166  continue;
167 
168  if (sa->slots[i].connection == NULL)
169  return i;
170  }
171 
172  return -1;
173 }
174 
175 /*
176  * Return the offset of the first idle slot, or -1 if all slots are busy.
177  */
178 static int
180 {
181  int i;
182 
183  for (i = 0; i < sa->numslots; i++)
184  if (!sa->slots[i].inUse)
185  return i;
186 
187  return -1;
188 }
189 
190 /*
191  * Wait for any slot's connection to have query results, consume the results,
192  * and update the slot's status as appropriate. Returns true on success,
193  * false on cancellation, on error, or if no slots are connected.
194  */
195 static bool
197 {
198  int i;
199  fd_set slotset;
200  int maxFd = 0;
201  PGconn *cancelconn = NULL;
202 
203  /* We must reconstruct the fd_set for each call to select_loop */
204  FD_ZERO(&slotset);
205 
206  for (i = 0; i < sa->numslots; i++)
207  {
208  int sock;
209 
210  /* We shouldn't get here if we still have slots without connections */
211  Assert(sa->slots[i].connection != NULL);
212 
213  sock = PQsocket(sa->slots[i].connection);
214 
215  /*
216  * We don't really expect any connections to lose their sockets after
217  * startup, but just in case, cope by ignoring them.
218  */
219  if (sock < 0)
220  continue;
221 
222  /* Keep track of the first valid connection we see. */
223  if (cancelconn == NULL)
224  cancelconn = sa->slots[i].connection;
225 
226  FD_SET(sock, &slotset);
227  if (sock > maxFd)
228  maxFd = sock;
229  }
230 
231  /*
232  * If we get this far with no valid connections, processing cannot
233  * continue.
234  */
235  if (cancelconn == NULL)
236  return false;
237 
238  SetCancelConn(cancelconn);
239  i = select_loop(maxFd, &slotset);
240  ResetCancelConn();
241 
242  /* failure? */
243  if (i < 0)
244  return false;
245 
246  for (i = 0; i < sa->numslots; i++)
247  {
248  int sock;
249 
250  sock = PQsocket(sa->slots[i].connection);
251 
252  if (sock >= 0 && FD_ISSET(sock, &slotset))
253  {
254  /* select() says input is available, so consume it */
255  PQconsumeInput(sa->slots[i].connection);
256  }
257 
258  /* Collect result(s) as long as any are available */
259  while (!PQisBusy(sa->slots[i].connection))
260  {
261  PGresult *result = PQgetResult(sa->slots[i].connection);
262 
263  if (result != NULL)
264  {
265  /* Handle and discard the command result */
266  if (!processQueryResult(&sa->slots[i], result))
267  return false;
268  }
269  else
270  {
271  /* This connection has become idle */
272  sa->slots[i].inUse = false;
273  ParallelSlotClearHandler(&sa->slots[i]);
274  break;
275  }
276  }
277  }
278  return true;
279 }
280 
281 /*
282  * Open a new database connection using the stored connection parameters and
283  * optionally a given dbname if not null, execute the stored initial command if
284  * any, and associate the new connection with the given slot.
285  */
286 static void
287 connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
288 {
289  const char *old_override;
290  ParallelSlot *slot = &sa->slots[slotno];
291 
292  old_override = sa->cparams->override_dbname;
293  if (dbname)
294  sa->cparams->override_dbname = dbname;
295  slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
296  sa->cparams->override_dbname = old_override;
297 
298  /*
299  * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
300  * FD_SET() and allied macros. Windows defines it as a ceiling on the
301  * count of file descriptors in the set, not a ceiling on the value of
302  * each file descriptor; see
303  * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
304  * and
305  * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
306  * We can't ignore that, because Windows starts file descriptors at a
307  * higher value, delays reuse, and skips values. With less than ten
308  * concurrent file descriptors, opened and closed rapidly, one can reach
309  * file descriptor 1024.
310  *
311  * Doing a hard exit here is a bit grotty, but it doesn't seem worth
312  * complicating the API to make it less grotty.
313  */
314 #ifdef WIN32
315  if (slotno >= FD_SETSIZE)
316  {
317  pg_log_error("too many jobs for this platform: %d", slotno);
318  exit(1);
319  }
320 #else
321  {
322  int fd = PQsocket(slot->connection);
323 
324  if (fd >= FD_SETSIZE)
325  {
326  pg_log_error("socket file descriptor out of range for select(): %d",
327  fd);
328  pg_log_error_hint("Try fewer jobs.");
329  exit(1);
330  }
331  }
332 #endif
333 
334  /* Setup the connection using the supplied command, if any. */
335  if (sa->initcmd)
336  executeCommand(slot->connection, sa->initcmd, sa->echo);
337 }
338 
339 /*
340  * ParallelSlotsGetIdle
341  * Return a connection slot that is ready to execute a command.
342  *
343  * The slot returned is chosen as follows:
344  *
345  * If any idle slot already has an open connection, and if either dbname is
346  * null or the existing connection is to the given database, that slot will be
347  * returned allowing the connection to be reused.
348  *
349  * Otherwise, if any idle slot is not yet connected to any database, the slot
350  * will be returned with it's connection opened using the stored cparams and
351  * optionally the given dbname if not null.
352  *
353  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
354  * after having it's connection disconnected and reconnected using the stored
355  * cparams and optionally the given dbname if not null.
356  *
357  * Otherwise, if any slots have connections that are busy, we loop on select()
358  * until one socket becomes available. When this happens, we read the whole
359  * set and mark as free all sockets that become available. We then select a
360  * slot using the same rules as above.
361  *
362  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
363  *
364  * For any connection created, if the stored initcmd is not null, it will be
365  * executed as a command on the newly formed connection before the slot is
366  * returned.
367  *
368  * If an error occurs, NULL is returned.
369  */
370 ParallelSlot *
372 {
373  int offset;
374 
375  Assert(sa);
376  Assert(sa->numslots > 0);
377 
378  while (1)
379  {
380  /* First choice: a slot already connected to the desired database. */
381  offset = find_matching_idle_slot(sa, dbname);
382  if (offset >= 0)
383  {
384  sa->slots[offset].inUse = true;
385  return &sa->slots[offset];
386  }
387 
388  /* Second choice: a slot not connected to any database. */
389  offset = find_unconnected_slot(sa);
390  if (offset >= 0)
391  {
392  connect_slot(sa, offset, dbname);
393  sa->slots[offset].inUse = true;
394  return &sa->slots[offset];
395  }
396 
397  /* Third choice: a slot connected to the wrong database. */
398  offset = find_any_idle_slot(sa);
399  if (offset >= 0)
400  {
401  disconnectDatabase(sa->slots[offset].connection);
402  sa->slots[offset].connection = NULL;
403  connect_slot(sa, offset, dbname);
404  sa->slots[offset].inUse = true;
405  return &sa->slots[offset];
406  }
407 
408  /*
409  * Fourth choice: block until one or more slots become available. If
410  * any slots hit a fatal error, we'll find out about that here and
411  * return NULL.
412  */
413  if (!wait_on_slots(sa))
414  return NULL;
415  }
416 }
417 
418 /*
419  * ParallelSlotsSetup
420  * Prepare a set of parallel slots but do not connect to any database.
421  *
422  * This creates and initializes a set of slots, marking all parallel slots as
423  * free and ready to use. Establishing connections is delayed until requesting
424  * a free slot. The cparams, progname, echo, and initcmd are stored for later
425  * use and must remain valid for the lifetime of the returned array.
426  */
428 ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
429  bool echo, const char *initcmd)
430 {
432 
433  Assert(numslots > 0);
434  Assert(cparams != NULL);
435  Assert(progname != NULL);
436 
437  sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
438  numslots * sizeof(ParallelSlot));
439 
440  sa->numslots = numslots;
441  sa->cparams = cparams;
442  sa->progname = progname;
443  sa->echo = echo;
444  sa->initcmd = initcmd;
445 
446  return sa;
447 }
448 
449 /*
450  * ParallelSlotsAdoptConn
451  * Assign an open connection to the slots array for reuse.
452  *
453  * This turns over ownership of an open connection to a slots array. The
454  * caller should not further use or close the connection. All the connection's
455  * parameters (user, host, port, etc.) except possibly dbname should match
456  * those of the slots array's cparams, as given in ParallelSlotsSetup. If
457  * these parameters differ, subsequent behavior is undefined.
458  */
459 void
461 {
462  int offset;
463 
464  offset = find_unconnected_slot(sa);
465  if (offset >= 0)
466  sa->slots[offset].connection = conn;
467  else
469 }
470 
471 /*
472  * ParallelSlotsTerminate
473  * Clean up a set of parallel slots
474  *
475  * Iterate through all connections in a given set of ParallelSlots and
476  * terminate all connections.
477  */
478 void
480 {
481  int i;
482 
483  for (i = 0; i < sa->numslots; i++)
484  {
485  PGconn *conn = sa->slots[i].connection;
486 
487  if (conn == NULL)
488  continue;
489 
491  }
492 }
493 
494 /*
495  * ParallelSlotsWaitCompletion
496  *
497  * Wait for all connections to finish, returning false if at least one
498  * error has been found on the way.
499  */
500 bool
502 {
503  int i;
504 
505  for (i = 0; i < sa->numslots; i++)
506  {
507  if (sa->slots[i].connection == NULL)
508  continue;
509  if (!consumeQueryResult(&sa->slots[i]))
510  return false;
511  /* Mark connection as idle */
512  sa->slots[i].inUse = false;
513  ParallelSlotClearHandler(&sa->slots[i]);
514  }
515 
516  return true;
517 }
518 
519 /*
520  * TableCommandResultHandler
521  *
522  * ParallelSlotResultHandler for results of commands (not queries) against
523  * tables.
524  *
525  * Requires that the result status is either PGRES_COMMAND_OK or an error about
526  * a missing table. This is useful for utilities that compile a list of tables
527  * to process and then run commands (vacuum, reindex, or whatever) against
528  * those tables, as there is a race condition between the time the list is
529  * compiled and the time the command attempts to open the table.
530  *
531  * For missing tables, logs an error but allows processing to continue.
532  *
533  * For all other errors, logs an error and terminates further processing.
534  *
535  * res: PGresult from the query executed on the slot's connection
536  * conn: connection belonging to the slot
537  * context: unused
538  */
539 bool
541 {
542  Assert(res != NULL);
543  Assert(conn != NULL);
544 
545  /*
546  * If it's an error, report it. Errors about a missing table are harmless
547  * so we continue processing; but die for other errors.
548  */
550  {
551  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
552 
553  pg_log_error("processing of database \"%s\" failed: %s",
555 
556  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
557  {
558  PQclear(res);
559  return false;
560  }
561  }
562 
563  return true;
564 }
struct ParallelSlot ParallelSlot
Definition: parallel.h:52
#define Assert(condition)
Definition: c.h:812
volatile sig_atomic_t CancelRequested
Definition: cancel.c:59
void ResetCancelConn(void)
Definition: cancel.c:107
void SetCancelConn(PGconn *conn)
Definition: cancel.c:77
void disconnectDatabase(PGconn *conn)
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7036
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7200
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7226
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
int i
Definition: isn.c:72
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
const char * progname
Definition: main.c:44
void * palloc0(Size size)
Definition: mcxt.c:1347
static bool wait_on_slots(ParallelSlotArray *sa)
ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel_slot.c:80
bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:28
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:58
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
Definition: parallel_slot.c:39
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
void ParallelSlotsTerminate(ParallelSlotArray *sa)
static int find_unconnected_slot(const ParallelSlotArray *sa)
void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
static int find_any_idle_slot(const ParallelSlotArray *sa)
static void ParallelSlotClearHandler(ParallelSlot *slot)
Definition: parallel_slot.h:55
static PGconn * connectDatabase(const char *dbname, const char *connection_string, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1663
static void executeCommand(PGconn *conn, const char *query)
Definition: pg_dumpall.c:1906
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56
static int fd(const char *x, int i)
Definition: preproc-init.c:105
tree context
Definition: radixtree.h:1837
char * dbname
Definition: streamutil.c:50
PGconn * conn
Definition: streamutil.c:53
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
PGconn * connection
Definition: parallel_slot.h:23
void * handler_context
Definition: parallel_slot.h:33
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513