PostgreSQL Source Code git master
parallel_slot.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include "common/logging.h"
#include "fe_utils/cancel.h"
#include "fe_utils/parallel_slot.h"
#include "fe_utils/query_utils.h"
Include dependency graph for parallel_slot.c:

Go to the source code of this file.

Macros

#define ERRCODE_UNDEFINED_TABLE   "42P01"
 

Functions

static int select_loop (int maxFd, fd_set *workerset)
 
static bool processQueryResult (ParallelSlot *slot, PGresult *result)
 
static bool consumeQueryResult (ParallelSlot *slot)
 
static int find_matching_idle_slot (const ParallelSlotArray *sa, const char *dbname)
 
static int find_unconnected_slot (const ParallelSlotArray *sa)
 
static int find_any_idle_slot (const ParallelSlotArray *sa)
 
static bool wait_on_slots (ParallelSlotArray *sa)
 
static void connect_slot (ParallelSlotArray *sa, int slotno, const char *dbname)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlotArray *sa, const char *dbname)
 
ParallelSlotArrayParallelSlotsSetup (int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
 
void ParallelSlotsAdoptConn (ParallelSlotArray *sa, PGconn *conn)
 
void ParallelSlotsTerminate (ParallelSlotArray *sa)
 
bool ParallelSlotsWaitCompletion (ParallelSlotArray *sa)
 
bool TableCommandResultHandler (PGresult *res, PGconn *conn, void *context)
 

Macro Definition Documentation

◆ ERRCODE_UNDEFINED_TABLE

#define ERRCODE_UNDEFINED_TABLE   "42P01"

Definition at line 28 of file parallel_slot.c.

Function Documentation

◆ connect_slot()

static void connect_slot ( ParallelSlotArray sa,
int  slotno,
const char *  dbname 
)
static

Definition at line 287 of file parallel_slot.c.

288{
289 const char *old_override;
290 ParallelSlot *slot = &sa->slots[slotno];
291
292 old_override = sa->cparams->override_dbname;
293 if (dbname)
294 sa->cparams->override_dbname = dbname;
295 slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
296 sa->cparams->override_dbname = old_override;
297
298 /*
299 * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
300 * FD_SET() and allied macros. Windows defines it as a ceiling on the
301 * count of file descriptors in the set, not a ceiling on the value of
302 * each file descriptor; see
303 * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
304 * and
305 * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
306 * We can't ignore that, because Windows starts file descriptors at a
307 * higher value, delays reuse, and skips values. With less than ten
308 * concurrent file descriptors, opened and closed rapidly, one can reach
309 * file descriptor 1024.
310 *
311 * Doing a hard exit here is a bit grotty, but it doesn't seem worth
312 * complicating the API to make it less grotty.
313 */
314#ifdef WIN32
315 if (slotno >= FD_SETSIZE)
316 {
317 pg_log_error("too many jobs for this platform: %d", slotno);
318 exit(1);
319 }
320#else
321 {
322 int fd = PQsocket(slot->connection);
323
324 if (fd >= FD_SETSIZE)
325 {
326 pg_log_error("socket file descriptor out of range for select(): %d",
327 fd);
328 pg_log_error_hint("Try fewer jobs.");
329 exit(1);
330 }
331 }
332#endif
333
334 /* Setup the connection using the supplied command, if any. */
335 if (sa->initcmd)
336 executeCommand(slot->connection, sa->initcmd, sa->echo);
337}
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7533
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
#define pg_log_error_hint(...)
Definition: logging.h:112
static PGconn * connectDatabase(const char *dbname, const char *connection_string, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1663
static void executeCommand(PGconn *conn, const char *query)
Definition: pg_dumpall.c:1906
static int fd(const char *x, int i)
Definition: preproc-init.c:105
char * dbname
Definition: streamutil.c:50
PGconn * connection
Definition: parallel_slot.h:23

References connectDatabase(), ParallelSlot::connection, dbname, executeCommand(), exit(), fd(), pg_log_error, pg_log_error_hint, and PQsocket().

Referenced by ParallelSlotsGetIdle().

◆ consumeQueryResult()

static bool consumeQueryResult ( ParallelSlot slot)
static

Definition at line 58 of file parallel_slot.c.

59{
60 bool ok = true;
61 PGresult *result;
62
64 while ((result = PQgetResult(slot->connection)) != NULL)
65 {
66 if (!processQueryResult(slot, result))
67 ok = false;
68 }
70 return ok;
71}
void ResetCancelConn(void)
Definition: cancel.c:107
void SetCancelConn(PGconn *conn)
Definition: cancel.c:77
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
Definition: parallel_slot.c:39

References ParallelSlot::connection, PQgetResult(), processQueryResult(), ResetCancelConn(), and SetCancelConn().

Referenced by ParallelSlotsWaitCompletion().

◆ find_any_idle_slot()

static int find_any_idle_slot ( const ParallelSlotArray sa)
static

Definition at line 179 of file parallel_slot.c.

180{
181 int i;
182
183 for (i = 0; i < sa->numslots; i++)
184 if (!sa->slots[i].inUse)
185 return i;
186
187 return -1;
188}
int i
Definition: isn.c:72

References i.

Referenced by ParallelSlotsGetIdle().

◆ find_matching_idle_slot()

static int find_matching_idle_slot ( const ParallelSlotArray sa,
const char *  dbname 
)
static

Definition at line 135 of file parallel_slot.c.

136{
137 int i;
138
139 for (i = 0; i < sa->numslots; i++)
140 {
141 if (sa->slots[i].inUse)
142 continue;
143
144 if (sa->slots[i].connection == NULL)
145 continue;
146
147 if (dbname == NULL ||
148 strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149 return i;
150 }
151 return -1;
152}
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7335

References dbname, i, and PQdb().

Referenced by ParallelSlotsGetIdle().

◆ find_unconnected_slot()

static int find_unconnected_slot ( const ParallelSlotArray sa)
static

Definition at line 159 of file parallel_slot.c.

160{
161 int i;
162
163 for (i = 0; i < sa->numslots; i++)
164 {
165 if (sa->slots[i].inUse)
166 continue;
167
168 if (sa->slots[i].connection == NULL)
169 return i;
170 }
171
172 return -1;
173}

References i.

Referenced by ParallelSlotsAdoptConn(), and ParallelSlotsGetIdle().

◆ ParallelSlotsAdoptConn()

void ParallelSlotsAdoptConn ( ParallelSlotArray sa,
PGconn conn 
)

Definition at line 460 of file parallel_slot.c.

461{
462 int offset;
463
464 offset = find_unconnected_slot(sa);
465 if (offset >= 0)
466 sa->slots[offset].connection = conn;
467 else
469}
void disconnectDatabase(PGconn *conn)
static int find_unconnected_slot(const ParallelSlotArray *sa)
PGconn * conn
Definition: streamutil.c:53

References conn, disconnectDatabase(), and find_unconnected_slot().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsGetIdle()

ParallelSlot * ParallelSlotsGetIdle ( ParallelSlotArray sa,
const char *  dbname 
)

Definition at line 371 of file parallel_slot.c.

372{
373 int offset;
374
375 Assert(sa);
376 Assert(sa->numslots > 0);
377
378 while (1)
379 {
380 /* First choice: a slot already connected to the desired database. */
382 if (offset >= 0)
383 {
384 sa->slots[offset].inUse = true;
385 return &sa->slots[offset];
386 }
387
388 /* Second choice: a slot not connected to any database. */
389 offset = find_unconnected_slot(sa);
390 if (offset >= 0)
391 {
392 connect_slot(sa, offset, dbname);
393 sa->slots[offset].inUse = true;
394 return &sa->slots[offset];
395 }
396
397 /* Third choice: a slot connected to the wrong database. */
398 offset = find_any_idle_slot(sa);
399 if (offset >= 0)
400 {
401 disconnectDatabase(sa->slots[offset].connection);
402 sa->slots[offset].connection = NULL;
403 connect_slot(sa, offset, dbname);
404 sa->slots[offset].inUse = true;
405 return &sa->slots[offset];
406 }
407
408 /*
409 * Fourth choice: block until one or more slots become available. If
410 * any slots hit a fatal error, we'll find out about that here and
411 * return NULL.
412 */
413 if (!wait_on_slots(sa))
414 return NULL;
415 }
416}
#define Assert(condition)
Definition: c.h:815
static bool wait_on_slots(ParallelSlotArray *sa)
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
static int find_any_idle_slot(const ParallelSlotArray *sa)

References Assert, connect_slot(), dbname, disconnectDatabase(), find_any_idle_slot(), find_matching_idle_slot(), find_unconnected_slot(), and wait_on_slots().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsSetup()

ParallelSlotArray * ParallelSlotsSetup ( int  numslots,
ConnParams cparams,
const char *  progname,
bool  echo,
const char *  initcmd 
)

Definition at line 428 of file parallel_slot.c.

430{
432
433 Assert(numslots > 0);
434 Assert(cparams != NULL);
435 Assert(progname != NULL);
436
437 sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
438 numslots * sizeof(ParallelSlot));
439
440 sa->numslots = numslots;
441 sa->cparams = cparams;
442 sa->progname = progname;
443 sa->echo = echo;
444 sa->initcmd = initcmd;
445
446 return sa;
447}
struct ParallelSlot ParallelSlot
Definition: parallel.h:52
const char * progname
Definition: main.c:44
void * palloc0(Size size)
Definition: mcxt.c:1347

References Assert, palloc0(), and progname.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlotArray sa)

Definition at line 479 of file parallel_slot.c.

480{
481 int i;
482
483 for (i = 0; i < sa->numslots; i++)
484 {
485 PGconn *conn = sa->slots[i].connection;
486
487 if (conn == NULL)
488 continue;
489
491 }
492}

References conn, disconnectDatabase(), and i.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlotArray sa)

Definition at line 501 of file parallel_slot.c.

502{
503 int i;
504
505 for (i = 0; i < sa->numslots; i++)
506 {
507 if (sa->slots[i].connection == NULL)
508 continue;
509 if (!consumeQueryResult(&sa->slots[i]))
510 return false;
511 /* Mark connection as idle */
512 sa->slots[i].inUse = false;
513 ParallelSlotClearHandler(&sa->slots[i]);
514 }
515
516 return true;
517}
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:58
static void ParallelSlotClearHandler(ParallelSlot *slot)
Definition: parallel_slot.h:55

References consumeQueryResult(), i, and ParallelSlotClearHandler().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ processQueryResult()

static bool processQueryResult ( ParallelSlot slot,
PGresult result 
)
static

Definition at line 39 of file parallel_slot.c.

40{
41 Assert(slot->handler != NULL);
42
43 /* On failure, the handler should return NULL after freeing the result */
44 if (!slot->handler(result, slot->connection, slot->handler_context))
45 return false;
46
47 /* Ok, we have to free it ourself */
48 PQclear(result);
49 return true;
50}
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
void * handler_context
Definition: parallel_slot.h:33

References Assert, ParallelSlot::connection, ParallelSlot::handler, ParallelSlot::handler_context, and PQclear().

Referenced by consumeQueryResult(), and wait_on_slots().

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset 
)
static

Definition at line 80 of file parallel_slot.c.

81{
82 int i;
83 fd_set saveSet = *workerset;
84
86 return -1;
87
88 for (;;)
89 {
90 /*
91 * On Windows, we need to check once in a while for cancel requests;
92 * on other platforms we rely on select() returning when interrupted.
93 */
94 struct timeval *tvp;
95#ifdef WIN32
96 struct timeval tv = {0, 1000000};
97
98 tvp = &tv;
99#else
100 tvp = NULL;
101#endif
102
103 *workerset = saveSet;
104 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105
106#ifdef WIN32
107 if (i == SOCKET_ERROR)
108 {
109 i = -1;
110
111 if (WSAGetLastError() == WSAEINTR)
112 errno = EINTR;
113 }
114#endif
115
116 if (i < 0 && errno == EINTR)
117 continue; /* ignore this */
118 if (i < 0 || CancelRequested)
119 return -1; /* but not this */
120 if (i == 0)
121 continue; /* timeout (Win32 only) */
122 break;
123 }
124
125 return i;
126}
volatile sig_atomic_t CancelRequested
Definition: cancel.c:59
#define EINTR
Definition: win32_port.h:364
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503

References CancelRequested, EINTR, i, and select.

Referenced by wait_on_slots().

◆ TableCommandResultHandler()

bool TableCommandResultHandler ( PGresult res,
PGconn conn,
void *  context 
)

Definition at line 540 of file parallel_slot.c.

541{
542 Assert(res != NULL);
543 Assert(conn != NULL);
544
545 /*
546 * If it's an error, report it. Errors about a missing table are harmless
547 * so we continue processing; but die for other errors.
548 */
550 {
551 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
552
553 pg_log_error("processing of database \"%s\" failed: %s",
555
556 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
557 {
558 PQclear(res);
559 return false;
560 }
561 }
562
563 return true;
564}
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7507
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3466
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:122
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:28
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57

References Assert, conn, ERRCODE_UNDEFINED_TABLE, PG_DIAG_SQLSTATE, pg_log_error, PGRES_COMMAND_OK, PQclear(), PQdb(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), and res.

Referenced by reindex_one_database(), and vacuum_one_database().

◆ wait_on_slots()

static bool wait_on_slots ( ParallelSlotArray sa)
static

Definition at line 196 of file parallel_slot.c.

197{
198 int i;
199 fd_set slotset;
200 int maxFd = 0;
201 PGconn *cancelconn = NULL;
202
203 /* We must reconstruct the fd_set for each call to select_loop */
204 FD_ZERO(&slotset);
205
206 for (i = 0; i < sa->numslots; i++)
207 {
208 int sock;
209
210 /* We shouldn't get here if we still have slots without connections */
211 Assert(sa->slots[i].connection != NULL);
212
213 sock = PQsocket(sa->slots[i].connection);
214
215 /*
216 * We don't really expect any connections to lose their sockets after
217 * startup, but just in case, cope by ignoring them.
218 */
219 if (sock < 0)
220 continue;
221
222 /* Keep track of the first valid connection we see. */
223 if (cancelconn == NULL)
224 cancelconn = sa->slots[i].connection;
225
226 FD_SET(sock, &slotset);
227 if (sock > maxFd)
228 maxFd = sock;
229 }
230
231 /*
232 * If we get this far with no valid connections, processing cannot
233 * continue.
234 */
235 if (cancelconn == NULL)
236 return false;
237
238 SetCancelConn(cancelconn);
239 i = select_loop(maxFd, &slotset);
241
242 /* failure? */
243 if (i < 0)
244 return false;
245
246 for (i = 0; i < sa->numslots; i++)
247 {
248 int sock;
249
250 sock = PQsocket(sa->slots[i].connection);
251
252 if (sock >= 0 && FD_ISSET(sock, &slotset))
253 {
254 /* select() says input is available, so consume it */
255 PQconsumeInput(sa->slots[i].connection);
256 }
257
258 /* Collect result(s) as long as any are available */
259 while (!PQisBusy(sa->slots[i].connection))
260 {
261 PGresult *result = PQgetResult(sa->slots[i].connection);
262
263 if (result != NULL)
264 {
265 /* Handle and discard the command result */
266 if (!processQueryResult(&sa->slots[i], result))
267 return false;
268 }
269 else
270 {
271 /* This connection has become idle */
272 sa->slots[i].inUse = false;
273 ParallelSlotClearHandler(&sa->slots[i]);
274 break;
275 }
276 }
277 }
278 return true;
279}
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel_slot.c:80

References Assert, i, ParallelSlotClearHandler(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), processQueryResult(), ResetCancelConn(), select_loop(), and SetCancelConn().

Referenced by ParallelSlotsGetIdle().