PostgreSQL Source Code  git master
parallel_slot.c File Reference
Include dependency graph for parallel_slot.c:

Go to the source code of this file.

Macros

#define ERRCODE_UNDEFINED_TABLE   "42P01"
 

Functions

static int select_loop (int maxFd, fd_set *workerset)
 
static bool processQueryResult (ParallelSlot *slot, PGresult *result)
 
static bool consumeQueryResult (ParallelSlot *slot)
 
static int find_matching_idle_slot (const ParallelSlotArray *sa, const char *dbname)
 
static int find_unconnected_slot (const ParallelSlotArray *sa)
 
static int find_any_idle_slot (const ParallelSlotArray *sa)
 
static bool wait_on_slots (ParallelSlotArray *sa)
 
static void connect_slot (ParallelSlotArray *sa, int slotno, const char *dbname)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlotArray *sa, const char *dbname)
 
ParallelSlotArrayParallelSlotsSetup (int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
 
void ParallelSlotsAdoptConn (ParallelSlotArray *sa, PGconn *conn)
 
void ParallelSlotsTerminate (ParallelSlotArray *sa)
 
bool ParallelSlotsWaitCompletion (ParallelSlotArray *sa)
 
bool TableCommandResultHandler (PGresult *res, PGconn *conn, void *context)
 

Macro Definition Documentation

◆ ERRCODE_UNDEFINED_TABLE

#define ERRCODE_UNDEFINED_TABLE   "42P01"

Definition at line 30 of file parallel_slot.c.

Function Documentation

◆ connect_slot()

static void connect_slot ( ParallelSlotArray sa,
int  slotno,
const char *  dbname 
)
static

Definition at line 289 of file parallel_slot.c.

290 {
291  const char *old_override;
292  ParallelSlot *slot = &sa->slots[slotno];
293 
294  old_override = sa->cparams->override_dbname;
295  if (dbname)
296  sa->cparams->override_dbname = dbname;
297  slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
298  sa->cparams->override_dbname = old_override;
299 
300  if (PQsocket(slot->connection) >= FD_SETSIZE)
301  pg_fatal("too many jobs for this platform");
302 
303  /* Setup the connection using the supplied command, if any. */
304  if (sa->initcmd)
305  executeCommand(slot->connection, sa->initcmd, sa->echo);
306 }
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6934
#define pg_fatal(...)
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1476
static void executeCommand(PGconn *conn, const char *query)
Definition: pg_dumpall.c:1722
char * dbname
Definition: streamutil.c:51
PGconn * connection
Definition: parallel_slot.h:23

References connectDatabase(), ParallelSlot::connection, dbname, executeCommand(), pg_fatal, and PQsocket().

Referenced by ParallelSlotsGetIdle().

◆ consumeQueryResult()

static bool consumeQueryResult ( ParallelSlot slot)
static

Definition at line 60 of file parallel_slot.c.

61 {
62  bool ok = true;
63  PGresult *result;
64 
66  while ((result = PQgetResult(slot->connection)) != NULL)
67  {
68  if (!processQueryResult(slot, result))
69  ok = false;
70  }
72  return ok;
73 }
void ResetCancelConn(void)
Definition: cancel.c:107
void SetCancelConn(PGconn *conn)
Definition: cancel.c:77
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2082
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
Definition: parallel_slot.c:41

References ParallelSlot::connection, PQgetResult(), processQueryResult(), ResetCancelConn(), and SetCancelConn().

Referenced by ParallelSlotsWaitCompletion().

◆ find_any_idle_slot()

static int find_any_idle_slot ( const ParallelSlotArray sa)
static

Definition at line 181 of file parallel_slot.c.

182 {
183  int i;
184 
185  for (i = 0; i < sa->numslots; i++)
186  if (!sa->slots[i].inUse)
187  return i;
188 
189  return -1;
190 }
int i
Definition: isn.c:73

References i.

Referenced by ParallelSlotsGetIdle().

◆ find_matching_idle_slot()

static int find_matching_idle_slot ( const ParallelSlotArray sa,
const char *  dbname 
)
static

Definition at line 137 of file parallel_slot.c.

138 {
139  int i;
140 
141  for (i = 0; i < sa->numslots; i++)
142  {
143  if (sa->slots[i].inUse)
144  continue;
145 
146  if (sa->slots[i].connection == NULL)
147  continue;
148 
149  if (dbname == NULL ||
150  strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
151  return i;
152  }
153  return -1;
154 }
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6754

References dbname, i, and PQdb().

Referenced by ParallelSlotsGetIdle().

◆ find_unconnected_slot()

static int find_unconnected_slot ( const ParallelSlotArray sa)
static

Definition at line 161 of file parallel_slot.c.

162 {
163  int i;
164 
165  for (i = 0; i < sa->numslots; i++)
166  {
167  if (sa->slots[i].inUse)
168  continue;
169 
170  if (sa->slots[i].connection == NULL)
171  return i;
172  }
173 
174  return -1;
175 }

References i.

Referenced by ParallelSlotsAdoptConn(), and ParallelSlotsGetIdle().

◆ ParallelSlotsAdoptConn()

void ParallelSlotsAdoptConn ( ParallelSlotArray sa,
PGconn conn 
)

Definition at line 429 of file parallel_slot.c.

430 {
431  int offset;
432 
433  offset = find_unconnected_slot(sa);
434  if (offset >= 0)
435  sa->slots[offset].connection = conn;
436  else
438 }
void disconnectDatabase(PGconn *conn)
static int find_unconnected_slot(const ParallelSlotArray *sa)
PGconn * conn
Definition: streamutil.c:54

References conn, disconnectDatabase(), and find_unconnected_slot().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsGetIdle()

ParallelSlot* ParallelSlotsGetIdle ( ParallelSlotArray sa,
const char *  dbname 
)

Definition at line 340 of file parallel_slot.c.

341 {
342  int offset;
343 
344  Assert(sa);
345  Assert(sa->numslots > 0);
346 
347  while (1)
348  {
349  /* First choice: a slot already connected to the desired database. */
350  offset = find_matching_idle_slot(sa, dbname);
351  if (offset >= 0)
352  {
353  sa->slots[offset].inUse = true;
354  return &sa->slots[offset];
355  }
356 
357  /* Second choice: a slot not connected to any database. */
358  offset = find_unconnected_slot(sa);
359  if (offset >= 0)
360  {
361  connect_slot(sa, offset, dbname);
362  sa->slots[offset].inUse = true;
363  return &sa->slots[offset];
364  }
365 
366  /* Third choice: a slot connected to the wrong database. */
367  offset = find_any_idle_slot(sa);
368  if (offset >= 0)
369  {
370  disconnectDatabase(sa->slots[offset].connection);
371  sa->slots[offset].connection = NULL;
372  connect_slot(sa, offset, dbname);
373  sa->slots[offset].inUse = true;
374  return &sa->slots[offset];
375  }
376 
377  /*
378  * Fourth choice: block until one or more slots become available. If
379  * any slots hit a fatal error, we'll find out about that here and
380  * return NULL.
381  */
382  if (!wait_on_slots(sa))
383  return NULL;
384  }
385 }
Assert(fmt[strlen(fmt) - 1] !='\n')
static bool wait_on_slots(ParallelSlotArray *sa)
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
static int find_any_idle_slot(const ParallelSlotArray *sa)

References Assert(), connect_slot(), dbname, disconnectDatabase(), find_any_idle_slot(), find_matching_idle_slot(), find_unconnected_slot(), and wait_on_slots().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsSetup()

ParallelSlotArray* ParallelSlotsSetup ( int  numslots,
ConnParams cparams,
const char *  progname,
bool  echo,
const char *  initcmd 
)

Definition at line 397 of file parallel_slot.c.

399 {
401 
402  Assert(numslots > 0);
403  Assert(cparams != NULL);
404  Assert(progname != NULL);
405 
407  numslots * sizeof(ParallelSlot));
408 
409  sa->numslots = numslots;
410  sa->cparams = cparams;
411  sa->progname = progname;
412  sa->echo = echo;
413  sa->initcmd = initcmd;
414 
415  return sa;
416 }
struct ParallelSlot ParallelSlot
Definition: parallel.h:52
#define offsetof(type, field)
Definition: c.h:727
const char * progname
Definition: main.c:50
void * palloc0(Size size)
Definition: mcxt.c:1099

References Assert(), offsetof, palloc0(), and progname.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlotArray sa)

Definition at line 448 of file parallel_slot.c.

449 {
450  int i;
451 
452  for (i = 0; i < sa->numslots; i++)
453  {
454  PGconn *conn = sa->slots[i].connection;
455 
456  if (conn == NULL)
457  continue;
458 
460  }
461 }

References conn, disconnectDatabase(), and i.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlotArray sa)

Definition at line 470 of file parallel_slot.c.

471 {
472  int i;
473 
474  for (i = 0; i < sa->numslots; i++)
475  {
476  if (sa->slots[i].connection == NULL)
477  continue;
478  if (!consumeQueryResult(&sa->slots[i]))
479  return false;
480  }
481 
482  return true;
483 }
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:60

References consumeQueryResult(), and i.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ processQueryResult()

static bool processQueryResult ( ParallelSlot slot,
PGresult result 
)
static

Definition at line 41 of file parallel_slot.c.

42 {
43  Assert(slot->handler != NULL);
44 
45  /* On failure, the handler should return NULL after freeing the result */
46  if (!slot->handler(result, slot->connection, slot->handler_context))
47  return false;
48 
49  /* Ok, we have to free it ourself */
50  PQclear(result);
51  return true;
52 }
void PQclear(PGresult *res)
Definition: fe-exec.c:718
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
void * handler_context
Definition: parallel_slot.h:33

References Assert(), ParallelSlot::connection, ParallelSlot::handler, ParallelSlot::handler_context, and PQclear().

Referenced by consumeQueryResult(), and wait_on_slots().

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset 
)
static

Definition at line 82 of file parallel_slot.c.

83 {
84  int i;
85  fd_set saveSet = *workerset;
86 
87  if (CancelRequested)
88  return -1;
89 
90  for (;;)
91  {
92  /*
93  * On Windows, we need to check once in a while for cancel requests;
94  * on other platforms we rely on select() returning when interrupted.
95  */
96  struct timeval *tvp;
97 #ifdef WIN32
98  struct timeval tv = {0, 1000000};
99 
100  tvp = &tv;
101 #else
102  tvp = NULL;
103 #endif
104 
105  *workerset = saveSet;
106  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
107 
108 #ifdef WIN32
109  if (i == SOCKET_ERROR)
110  {
111  i = -1;
112 
113  if (WSAGetLastError() == WSAEINTR)
114  errno = EINTR;
115  }
116 #endif
117 
118  if (i < 0 && errno == EINTR)
119  continue; /* ignore this */
120  if (i < 0 || CancelRequested)
121  return -1; /* but not this */
122  if (i == 0)
123  continue; /* timeout (Win32 only) */
124  break;
125  }
126 
127  return i;
128 }
volatile sig_atomic_t CancelRequested
Definition: cancel.c:59
#define EINTR
Definition: win32_port.h:351
#define select(n, r, w, e, timeout)
Definition: win32_port.h:474

References CancelRequested, EINTR, i, and select.

Referenced by wait_on_slots().

◆ TableCommandResultHandler()

bool TableCommandResultHandler ( PGresult res,
PGconn conn,
void *  context 
)

Definition at line 506 of file parallel_slot.c.

507 {
508  Assert(res != NULL);
509  Assert(conn != NULL);
510 
511  /*
512  * If it's an error, report it. Errors about a missing table are harmless
513  * so we continue processing; but die for other errors.
514  */
516  {
517  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
518 
519  pg_log_error("processing of database \"%s\" failed: %s",
521 
522  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
523  {
524  PQclear(res);
525  return false;
526  }
527  }
528 
529  return true;
530 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6908
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3270
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3325
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
#define pg_log_error(...)
Definition: logging.h:106
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:30
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57

References Assert(), conn, ERRCODE_UNDEFINED_TABLE, PG_DIAG_SQLSTATE, pg_log_error, PGRES_COMMAND_OK, PQclear(), PQdb(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), and res.

Referenced by reindex_one_database(), and vacuum_one_database().

◆ wait_on_slots()

static bool wait_on_slots ( ParallelSlotArray sa)
static

Definition at line 198 of file parallel_slot.c.

199 {
200  int i;
201  fd_set slotset;
202  int maxFd = 0;
203  PGconn *cancelconn = NULL;
204 
205  /* We must reconstruct the fd_set for each call to select_loop */
206  FD_ZERO(&slotset);
207 
208  for (i = 0; i < sa->numslots; i++)
209  {
210  int sock;
211 
212  /* We shouldn't get here if we still have slots without connections */
213  Assert(sa->slots[i].connection != NULL);
214 
215  sock = PQsocket(sa->slots[i].connection);
216 
217  /*
218  * We don't really expect any connections to lose their sockets after
219  * startup, but just in case, cope by ignoring them.
220  */
221  if (sock < 0)
222  continue;
223 
224  /* Keep track of the first valid connection we see. */
225  if (cancelconn == NULL)
226  cancelconn = sa->slots[i].connection;
227 
228  FD_SET(sock, &slotset);
229  if (sock > maxFd)
230  maxFd = sock;
231  }
232 
233  /*
234  * If we get this far with no valid connections, processing cannot
235  * continue.
236  */
237  if (cancelconn == NULL)
238  return false;
239 
240  SetCancelConn(sa->slots->connection);
241  i = select_loop(maxFd, &slotset);
242  ResetCancelConn();
243 
244  /* failure? */
245  if (i < 0)
246  return false;
247 
248  for (i = 0; i < sa->numslots; i++)
249  {
250  int sock;
251 
252  sock = PQsocket(sa->slots[i].connection);
253 
254  if (sock >= 0 && FD_ISSET(sock, &slotset))
255  {
256  /* select() says input is available, so consume it */
257  PQconsumeInput(sa->slots[i].connection);
258  }
259 
260  /* Collect result(s) as long as any are available */
261  while (!PQisBusy(sa->slots[i].connection))
262  {
263  PGresult *result = PQgetResult(sa->slots[i].connection);
264 
265  if (result != NULL)
266  {
267  /* Handle and discard the command result */
268  if (!processQueryResult(&sa->slots[i], result))
269  return false;
270  }
271  else
272  {
273  /* This connection has become idle */
274  sa->slots[i].inUse = false;
275  ParallelSlotClearHandler(&sa->slots[i]);
276  break;
277  }
278  }
279  }
280  return true;
281 }
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:2004
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2051
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel_slot.c:82
static void ParallelSlotClearHandler(ParallelSlot *slot)
Definition: parallel_slot.h:55

References Assert(), i, ParallelSlotClearHandler(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), processQueryResult(), ResetCancelConn(), select_loop(), and SetCancelConn().

Referenced by ParallelSlotsGetIdle().