PostgreSQL Source Code  git master
parallel_slot.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * parallel_slot.c
4  * Parallel support for front-end parallel database connections
5  *
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/fe_utils/parallel_slot.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #ifdef WIN32
16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
17 #endif
18 
19 #include "postgres_fe.h"
20 
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
24 
25 #include "common/logging.h"
26 #include "fe_utils/cancel.h"
27 #include "fe_utils/parallel_slot.h"
28 #include "fe_utils/query_utils.h"
29 
30 #define ERRCODE_UNDEFINED_TABLE "42P01"
31 
32 static int select_loop(int maxFd, fd_set *workerset);
33 static bool processQueryResult(ParallelSlot *slot, PGresult *result);
34 
35 /*
36  * Process (and delete) a query result. Returns true if there's no problem,
37  * false otherwise. It's up to the handler to decide what constitutes a
38  * problem.
39  */
40 static bool
42 {
43  Assert(slot->handler != NULL);
44 
45  /* On failure, the handler should return NULL after freeing the result */
46  if (!slot->handler(result, slot->connection, slot->handler_context))
47  return false;
48 
49  /* Ok, we have to free it ourself */
50  PQclear(result);
51  return true;
52 }
53 
54 /*
55  * Consume all the results generated for the given connection until
56  * nothing remains. If at least one error is encountered, return false.
57  * Note that this will block if the connection is busy.
58  */
59 static bool
61 {
62  bool ok = true;
63  PGresult *result;
64 
66  while ((result = PQgetResult(slot->connection)) != NULL)
67  {
68  if (!processQueryResult(slot, result))
69  ok = false;
70  }
72  return ok;
73 }
74 
75 /*
76  * Wait until a file descriptor from the given set becomes readable.
77  *
78  * Returns the number of ready descriptors, or -1 on failure (including
79  * getting a cancel request).
80  */
81 static int
82 select_loop(int maxFd, fd_set *workerset)
83 {
84  int i;
85  fd_set saveSet = *workerset;
86 
87  if (CancelRequested)
88  return -1;
89 
90  for (;;)
91  {
92  /*
93  * On Windows, we need to check once in a while for cancel requests;
94  * on other platforms we rely on select() returning when interrupted.
95  */
96  struct timeval *tvp;
97 #ifdef WIN32
98  struct timeval tv = {0, 1000000};
99 
100  tvp = &tv;
101 #else
102  tvp = NULL;
103 #endif
104 
105  *workerset = saveSet;
106  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
107 
108 #ifdef WIN32
109  if (i == SOCKET_ERROR)
110  {
111  i = -1;
112 
113  if (WSAGetLastError() == WSAEINTR)
114  errno = EINTR;
115  }
116 #endif
117 
118  if (i < 0 && errno == EINTR)
119  continue; /* ignore this */
120  if (i < 0 || CancelRequested)
121  return -1; /* but not this */
122  if (i == 0)
123  continue; /* timeout (Win32 only) */
124  break;
125  }
126 
127  return i;
128 }
129 
130 /*
131  * Return the offset of a suitable idle slot, or -1 if none are available. If
132  * the given dbname is not null, only idle slots connected to the given
133  * database are considered suitable, otherwise all idle connected slots are
134  * considered suitable.
135  */
136 static int
138 {
139  int i;
140 
141  for (i = 0; i < sa->numslots; i++)
142  {
143  if (sa->slots[i].inUse)
144  continue;
145 
146  if (sa->slots[i].connection == NULL)
147  continue;
148 
149  if (dbname == NULL ||
150  strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
151  return i;
152  }
153  return -1;
154 }
155 
156 /*
157  * Return the offset of the first slot without a database connection, or -1 if
158  * all slots are connected.
159  */
160 static int
162 {
163  int i;
164 
165  for (i = 0; i < sa->numslots; i++)
166  {
167  if (sa->slots[i].inUse)
168  continue;
169 
170  if (sa->slots[i].connection == NULL)
171  return i;
172  }
173 
174  return -1;
175 }
176 
177 /*
178  * Return the offset of the first idle slot, or -1 if all slots are busy.
179  */
180 static int
182 {
183  int i;
184 
185  for (i = 0; i < sa->numslots; i++)
186  if (!sa->slots[i].inUse)
187  return i;
188 
189  return -1;
190 }
191 
192 /*
193  * Wait for any slot's connection to have query results, consume the results,
194  * and update the slot's status as appropriate. Returns true on success,
195  * false on cancellation, on error, or if no slots are connected.
196  */
197 static bool
199 {
200  int i;
201  fd_set slotset;
202  int maxFd = 0;
203  PGconn *cancelconn = NULL;
204 
205  /* We must reconstruct the fd_set for each call to select_loop */
206  FD_ZERO(&slotset);
207 
208  for (i = 0; i < sa->numslots; i++)
209  {
210  int sock;
211 
212  /* We shouldn't get here if we still have slots without connections */
213  Assert(sa->slots[i].connection != NULL);
214 
215  sock = PQsocket(sa->slots[i].connection);
216 
217  /*
218  * We don't really expect any connections to lose their sockets after
219  * startup, but just in case, cope by ignoring them.
220  */
221  if (sock < 0)
222  continue;
223 
224  /* Keep track of the first valid connection we see. */
225  if (cancelconn == NULL)
226  cancelconn = sa->slots[i].connection;
227 
228  FD_SET(sock, &slotset);
229  if (sock > maxFd)
230  maxFd = sock;
231  }
232 
233  /*
234  * If we get this far with no valid connections, processing cannot
235  * continue.
236  */
237  if (cancelconn == NULL)
238  return false;
239 
241  i = select_loop(maxFd, &slotset);
242  ResetCancelConn();
243 
244  /* failure? */
245  if (i < 0)
246  return false;
247 
248  for (i = 0; i < sa->numslots; i++)
249  {
250  int sock;
251 
252  sock = PQsocket(sa->slots[i].connection);
253 
254  if (sock >= 0 && FD_ISSET(sock, &slotset))
255  {
256  /* select() says input is available, so consume it */
258  }
259 
260  /* Collect result(s) as long as any are available */
261  while (!PQisBusy(sa->slots[i].connection))
262  {
263  PGresult *result = PQgetResult(sa->slots[i].connection);
264 
265  if (result != NULL)
266  {
267  /* Handle and discard the command result */
268  if (!processQueryResult(&sa->slots[i], result))
269  return false;
270  }
271  else
272  {
273  /* This connection has become idle */
274  sa->slots[i].inUse = false;
276  break;
277  }
278  }
279  }
280  return true;
281 }
282 
283 /*
284  * Open a new database connection using the stored connection parameters and
285  * optionally a given dbname if not null, execute the stored initial command if
286  * any, and associate the new connection with the given slot.
287  */
288 static void
289 connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
290 {
291  const char *old_override;
292  ParallelSlot *slot = &sa->slots[slotno];
293 
294  old_override = sa->cparams->override_dbname;
295  if (dbname)
297  slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
298  sa->cparams->override_dbname = old_override;
299 
300  if (PQsocket(slot->connection) >= FD_SETSIZE)
301  {
302  pg_log_fatal("too many jobs for this platform");
303  exit(1);
304  }
305 
306  /* Setup the connection using the supplied command, if any. */
307  if (sa->initcmd)
308  executeCommand(slot->connection, sa->initcmd, sa->echo);
309 }
310 
311 /*
312  * ParallelSlotsGetIdle
313  * Return a connection slot that is ready to execute a command.
314  *
315  * The slot returned is chosen as follows:
316  *
317  * If any idle slot already has an open connection, and if either dbname is
318  * null or the existing connection is to the given database, that slot will be
319  * returned allowing the connection to be reused.
320  *
321  * Otherwise, if any idle slot is not yet connected to any database, the slot
322  * will be returned with it's connection opened using the stored cparams and
323  * optionally the given dbname if not null.
324  *
325  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
326  * after having it's connection disconnected and reconnected using the stored
327  * cparams and optionally the given dbname if not null.
328  *
329  * Otherwise, if any slots have connections that are busy, we loop on select()
330  * until one socket becomes available. When this happens, we read the whole
331  * set and mark as free all sockets that become available. We then select a
332  * slot using the same rules as above.
333  *
334  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
335  *
336  * For any connection created, if the stored initcmd is not null, it will be
337  * executed as a command on the newly formed connection before the slot is
338  * returned.
339  *
340  * If an error occurs, NULL is returned.
341  */
342 ParallelSlot *
344 {
345  int offset;
346 
347  Assert(sa);
348  Assert(sa->numslots > 0);
349 
350  while (1)
351  {
352  /* First choice: a slot already connected to the desired database. */
353  offset = find_matching_idle_slot(sa, dbname);
354  if (offset >= 0)
355  {
356  sa->slots[offset].inUse = true;
357  return &sa->slots[offset];
358  }
359 
360  /* Second choice: a slot not connected to any database. */
361  offset = find_unconnected_slot(sa);
362  if (offset >= 0)
363  {
364  connect_slot(sa, offset, dbname);
365  sa->slots[offset].inUse = true;
366  return &sa->slots[offset];
367  }
368 
369  /* Third choice: a slot connected to the wrong database. */
370  offset = find_any_idle_slot(sa);
371  if (offset >= 0)
372  {
373  disconnectDatabase(sa->slots[offset].connection);
374  sa->slots[offset].connection = NULL;
375  connect_slot(sa, offset, dbname);
376  sa->slots[offset].inUse = true;
377  return &sa->slots[offset];
378  }
379 
380  /*
381  * Fourth choice: block until one or more slots become available. If
382  * any slots hit a fatal error, we'll find out about that here and
383  * return NULL.
384  */
385  if (!wait_on_slots(sa))
386  return NULL;
387  }
388 }
389 
390 /*
391  * ParallelSlotsSetup
392  * Prepare a set of parallel slots but do not connect to any database.
393  *
394  * This creates and initializes a set of slots, marking all parallel slots as
395  * free and ready to use. Establishing connections is delayed until requesting
396  * a free slot. The cparams, progname, echo, and initcmd are stored for later
397  * use and must remain valid for the lifetime of the returned array.
398  */
400 ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
401  bool echo, const char *initcmd)
402 {
404 
405  Assert(numslots > 0);
406  Assert(cparams != NULL);
407  Assert(progname != NULL);
408 
410  numslots * sizeof(ParallelSlot));
411 
412  sa->numslots = numslots;
413  sa->cparams = cparams;
414  sa->progname = progname;
415  sa->echo = echo;
416  sa->initcmd = initcmd;
417 
418  return sa;
419 }
420 
421 /*
422  * ParallelSlotsAdoptConn
423  * Assign an open connection to the slots array for reuse.
424  *
425  * This turns over ownership of an open connection to a slots array. The
426  * caller should not further use or close the connection. All the connection's
427  * parameters (user, host, port, etc.) except possibly dbname should match
428  * those of the slots array's cparams, as given in ParallelSlotsSetup. If
429  * these parameters differ, subsequent behavior is undefined.
430  */
431 void
433 {
434  int offset;
435 
436  offset = find_unconnected_slot(sa);
437  if (offset >= 0)
438  sa->slots[offset].connection = conn;
439  else
440  disconnectDatabase(conn);
441 }
442 
443 /*
444  * ParallelSlotsTerminate
445  * Clean up a set of parallel slots
446  *
447  * Iterate through all connections in a given set of ParallelSlots and
448  * terminate all connections.
449  */
450 void
452 {
453  int i;
454 
455  for (i = 0; i < sa->numslots; i++)
456  {
457  PGconn *conn = sa->slots[i].connection;
458 
459  if (conn == NULL)
460  continue;
461 
462  disconnectDatabase(conn);
463  }
464 }
465 
466 /*
467  * ParallelSlotsWaitCompletion
468  *
469  * Wait for all connections to finish, returning false if at least one
470  * error has been found on the way.
471  */
472 bool
474 {
475  int i;
476 
477  for (i = 0; i < sa->numslots; i++)
478  {
479  if (sa->slots[i].connection == NULL)
480  continue;
481  if (!consumeQueryResult(&sa->slots[i]))
482  return false;
483  }
484 
485  return true;
486 }
487 
488 /*
489  * TableCommandResultHandler
490  *
491  * ParallelSlotResultHandler for results of commands (not queries) against
492  * tables.
493  *
494  * Requires that the result status is either PGRES_COMMAND_OK or an error about
495  * a missing table. This is useful for utilities that compile a list of tables
496  * to process and then run commands (vacuum, reindex, or whatever) against
497  * those tables, as there is a race condition between the time the list is
498  * compiled and the time the command attempts to open the table.
499  *
500  * For missing tables, logs an error but allows processing to continue.
501  *
502  * For all other errors, logs an error and terminates further processing.
503  *
504  * res: PGresult from the query executed on the slot's connection
505  * conn: connection belonging to the slot
506  * context: unused
507  */
508 bool
510 {
511  Assert(res != NULL);
512  Assert(conn != NULL);
513 
514  /*
515  * If it's an error, report it. Errors about a missing table are harmless
516  * so we continue processing; but die for other errors.
517  */
518  if (PQresultStatus(res) != PGRES_COMMAND_OK)
519  {
520  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
521 
522  pg_log_error("processing of database \"%s\" failed: %s",
523  PQdb(conn), PQerrorMessage(conn));
524 
525  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
526  {
527  PQclear(res);
528  return false;
529  }
530  }
531 
532  return true;
533 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6735
const char * progname
Definition: main.c:46
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
#define pg_log_error(...)
Definition: logging.h:80
void ResetCancelConn(void)
Definition: cancel.c:100
struct ParallelSlot ParallelSlot
Definition: parallel.h:37
void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
static int find_unconnected_slot(const ParallelSlotArray *sa)
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3097
ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
void disconnectDatabase(PGconn *conn)
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
PGconn * conn
Definition: streamutil.c:54
static void executeCommand(PGconn *conn, const char *query)
Definition: pg_dumpall.c:1897
const char * initcmd
Definition: parallel_slot.h:42
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:30
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel_slot.c:82
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
void SetCancelConn(PGconn *conn)
Definition: cancel.c:70
PGconn * connection
Definition: parallel_slot.h:23
char * override_dbname
Definition: pg_backup.h:72
void * palloc0(Size size)
Definition: mcxt.c:1093
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
Definition: parallel_slot.c:41
bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static void ParallelSlotClearHandler(ParallelSlot *slot)
Definition: parallel_slot.h:55
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:60
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1853
void * handler_context
Definition: parallel_slot.h:33
void PQclear(PGresult *res)
Definition: fe-exec.c:680
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6581
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3152
#define Assert(condition)
Definition: c.h:804
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1636
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1900
char * dbname
Definition: streamutil.c:51
volatile sig_atomic_t CancelRequested
Definition: cancel.c:52
const char * progname
Definition: parallel_slot.h:40
void ParallelSlotsTerminate(ParallelSlotArray *sa)
int i
ParallelSlot slots[FLEXIBLE_ARRAY_MEMBER]
Definition: parallel_slot.h:43
bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
#define EINTR
Definition: win32_port.h:343
ConnParams * cparams
Definition: parallel_slot.h:39
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6753
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1927
static int find_any_idle_slot(const ParallelSlotArray *sa)
#define offsetof(type, field)
Definition: c.h:727
static bool wait_on_slots(ParallelSlotArray *sa)
#define pg_log_fatal(...)
Definition: logging.h:76