PostgreSQL Source Code  git master
parallel_slot.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include "common/logging.h"
#include "fe_utils/cancel.h"
#include "fe_utils/parallel_slot.h"
#include "fe_utils/query_utils.h"
Include dependency graph for parallel_slot.c:

Go to the source code of this file.

Macros

#define ERRCODE_UNDEFINED_TABLE   "42P01"
 

Functions

static int select_loop (int maxFd, fd_set *workerset)
 
static bool processQueryResult (ParallelSlot *slot, PGresult *result)
 
static bool consumeQueryResult (ParallelSlot *slot)
 
static int find_matching_idle_slot (const ParallelSlotArray *sa, const char *dbname)
 
static int find_unconnected_slot (const ParallelSlotArray *sa)
 
static int find_any_idle_slot (const ParallelSlotArray *sa)
 
static bool wait_on_slots (ParallelSlotArray *sa)
 
static void connect_slot (ParallelSlotArray *sa, int slotno, const char *dbname)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlotArray *sa, const char *dbname)
 
ParallelSlotArrayParallelSlotsSetup (int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
 
void ParallelSlotsAdoptConn (ParallelSlotArray *sa, PGconn *conn)
 
void ParallelSlotsTerminate (ParallelSlotArray *sa)
 
bool ParallelSlotsWaitCompletion (ParallelSlotArray *sa)
 
bool TableCommandResultHandler (PGresult *res, PGconn *conn, void *context)
 

Macro Definition Documentation

◆ ERRCODE_UNDEFINED_TABLE

#define ERRCODE_UNDEFINED_TABLE   "42P01"

Definition at line 28 of file parallel_slot.c.

Function Documentation

◆ connect_slot()

static void connect_slot ( ParallelSlotArray sa,
int  slotno,
const char *  dbname 
)
static

Definition at line 287 of file parallel_slot.c.

288 {
289  const char *old_override;
290  ParallelSlot *slot = &sa->slots[slotno];
291 
292  old_override = sa->cparams->override_dbname;
293  if (dbname)
294  sa->cparams->override_dbname = dbname;
295  slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
296  sa->cparams->override_dbname = old_override;
297 
298  if (PQsocket(slot->connection) >= FD_SETSIZE)
299  pg_fatal("too many jobs for this platform");
300 
301  /* Setup the connection using the supplied command, if any. */
302  if (sa->initcmd)
303  executeCommand(slot->connection, sa->initcmd, sa->echo);
304 }
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7271
#define pg_fatal(...)
static PGconn * connectDatabase(const char *dbname, const char *connection_string, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1642
static void executeCommand(PGconn *conn, const char *query)
Definition: pg_dumpall.c:1885
char * dbname
Definition: streamutil.c:51
PGconn * connection
Definition: parallel_slot.h:23

References connectDatabase(), ParallelSlot::connection, dbname, executeCommand(), pg_fatal, and PQsocket().

Referenced by ParallelSlotsGetIdle().

◆ consumeQueryResult()

static bool consumeQueryResult ( ParallelSlot slot)
static

Definition at line 58 of file parallel_slot.c.

59 {
60  bool ok = true;
61  PGresult *result;
62 
64  while ((result = PQgetResult(slot->connection)) != NULL)
65  {
66  if (!processQueryResult(slot, result))
67  ok = false;
68  }
70  return ok;
71 }
void ResetCancelConn(void)
Definition: cancel.c:107
void SetCancelConn(PGconn *conn)
Definition: cancel.c:77
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2035
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
Definition: parallel_slot.c:39

References ParallelSlot::connection, PQgetResult(), processQueryResult(), ResetCancelConn(), and SetCancelConn().

Referenced by ParallelSlotsWaitCompletion().

◆ find_any_idle_slot()

static int find_any_idle_slot ( const ParallelSlotArray sa)
static

Definition at line 179 of file parallel_slot.c.

180 {
181  int i;
182 
183  for (i = 0; i < sa->numslots; i++)
184  if (!sa->slots[i].inUse)
185  return i;
186 
187  return -1;
188 }
int i
Definition: isn.c:73

References i.

Referenced by ParallelSlotsGetIdle().

◆ find_matching_idle_slot()

static int find_matching_idle_slot ( const ParallelSlotArray sa,
const char *  dbname 
)
static

Definition at line 135 of file parallel_slot.c.

136 {
137  int i;
138 
139  for (i = 0; i < sa->numslots; i++)
140  {
141  if (sa->slots[i].inUse)
142  continue;
143 
144  if (sa->slots[i].connection == NULL)
145  continue;
146 
147  if (dbname == NULL ||
148  strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149  return i;
150  }
151  return -1;
152 }
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:7091

References dbname, i, and PQdb().

Referenced by ParallelSlotsGetIdle().

◆ find_unconnected_slot()

static int find_unconnected_slot ( const ParallelSlotArray sa)
static

Definition at line 159 of file parallel_slot.c.

160 {
161  int i;
162 
163  for (i = 0; i < sa->numslots; i++)
164  {
165  if (sa->slots[i].inUse)
166  continue;
167 
168  if (sa->slots[i].connection == NULL)
169  return i;
170  }
171 
172  return -1;
173 }

References i.

Referenced by ParallelSlotsAdoptConn(), and ParallelSlotsGetIdle().

◆ ParallelSlotsAdoptConn()

void ParallelSlotsAdoptConn ( ParallelSlotArray sa,
PGconn conn 
)

Definition at line 427 of file parallel_slot.c.

428 {
429  int offset;
430 
431  offset = find_unconnected_slot(sa);
432  if (offset >= 0)
433  sa->slots[offset].connection = conn;
434  else
436 }
void disconnectDatabase(PGconn *conn)
static int find_unconnected_slot(const ParallelSlotArray *sa)
PGconn * conn
Definition: streamutil.c:54

References conn, disconnectDatabase(), and find_unconnected_slot().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsGetIdle()

ParallelSlot* ParallelSlotsGetIdle ( ParallelSlotArray sa,
const char *  dbname 
)

Definition at line 338 of file parallel_slot.c.

339 {
340  int offset;
341 
342  Assert(sa);
343  Assert(sa->numslots > 0);
344 
345  while (1)
346  {
347  /* First choice: a slot already connected to the desired database. */
348  offset = find_matching_idle_slot(sa, dbname);
349  if (offset >= 0)
350  {
351  sa->slots[offset].inUse = true;
352  return &sa->slots[offset];
353  }
354 
355  /* Second choice: a slot not connected to any database. */
356  offset = find_unconnected_slot(sa);
357  if (offset >= 0)
358  {
359  connect_slot(sa, offset, dbname);
360  sa->slots[offset].inUse = true;
361  return &sa->slots[offset];
362  }
363 
364  /* Third choice: a slot connected to the wrong database. */
365  offset = find_any_idle_slot(sa);
366  if (offset >= 0)
367  {
368  disconnectDatabase(sa->slots[offset].connection);
369  sa->slots[offset].connection = NULL;
370  connect_slot(sa, offset, dbname);
371  sa->slots[offset].inUse = true;
372  return &sa->slots[offset];
373  }
374 
375  /*
376  * Fourth choice: block until one or more slots become available. If
377  * any slots hit a fatal error, we'll find out about that here and
378  * return NULL.
379  */
380  if (!wait_on_slots(sa))
381  return NULL;
382  }
383 }
Assert(fmt[strlen(fmt) - 1] !='\n')
static bool wait_on_slots(ParallelSlotArray *sa)
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
static int find_any_idle_slot(const ParallelSlotArray *sa)

References Assert(), connect_slot(), dbname, disconnectDatabase(), find_any_idle_slot(), find_matching_idle_slot(), find_unconnected_slot(), and wait_on_slots().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsSetup()

ParallelSlotArray* ParallelSlotsSetup ( int  numslots,
ConnParams cparams,
const char *  progname,
bool  echo,
const char *  initcmd 
)

Definition at line 395 of file parallel_slot.c.

397 {
399 
400  Assert(numslots > 0);
401  Assert(cparams != NULL);
402  Assert(progname != NULL);
403 
404  sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
405  numslots * sizeof(ParallelSlot));
406 
407  sa->numslots = numslots;
408  sa->cparams = cparams;
409  sa->progname = progname;
410  sa->echo = echo;
411  sa->initcmd = initcmd;
412 
413  return sa;
414 }
struct ParallelSlot ParallelSlot
Definition: parallel.h:52
const char * progname
Definition: main.c:45
void * palloc0(Size size)
Definition: mcxt.c:1257

References Assert(), palloc0(), and progname.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlotArray sa)

Definition at line 446 of file parallel_slot.c.

447 {
448  int i;
449 
450  for (i = 0; i < sa->numslots; i++)
451  {
452  PGconn *conn = sa->slots[i].connection;
453 
454  if (conn == NULL)
455  continue;
456 
458  }
459 }

References conn, disconnectDatabase(), and i.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlotArray sa)

Definition at line 468 of file parallel_slot.c.

469 {
470  int i;
471 
472  for (i = 0; i < sa->numslots; i++)
473  {
474  if (sa->slots[i].connection == NULL)
475  continue;
476  if (!consumeQueryResult(&sa->slots[i]))
477  return false;
478  /* Mark connection as idle */
479  sa->slots[i].inUse = false;
480  ParallelSlotClearHandler(&sa->slots[i]);
481  }
482 
483  return true;
484 }
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:58
static void ParallelSlotClearHandler(ParallelSlot *slot)
Definition: parallel_slot.h:55

References consumeQueryResult(), i, and ParallelSlotClearHandler().

Referenced by main(), reindex_one_database(), and vacuum_one_database().

◆ processQueryResult()

static bool processQueryResult ( ParallelSlot slot,
PGresult result 
)
static

Definition at line 39 of file parallel_slot.c.

40 {
41  Assert(slot->handler != NULL);
42 
43  /* On failure, the handler should return NULL after freeing the result */
44  if (!slot->handler(result, slot->connection, slot->handler_context))
45  return false;
46 
47  /* Ok, we have to free it ourself */
48  PQclear(result);
49  return true;
50 }
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
void * handler_context
Definition: parallel_slot.h:33

References Assert(), ParallelSlot::connection, ParallelSlot::handler, ParallelSlot::handler_context, and PQclear().

Referenced by consumeQueryResult(), and wait_on_slots().

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset 
)
static

Definition at line 80 of file parallel_slot.c.

81 {
82  int i;
83  fd_set saveSet = *workerset;
84 
85  if (CancelRequested)
86  return -1;
87 
88  for (;;)
89  {
90  /*
91  * On Windows, we need to check once in a while for cancel requests;
92  * on other platforms we rely on select() returning when interrupted.
93  */
94  struct timeval *tvp;
95 #ifdef WIN32
96  struct timeval tv = {0, 1000000};
97 
98  tvp = &tv;
99 #else
100  tvp = NULL;
101 #endif
102 
103  *workerset = saveSet;
104  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
105 
106 #ifdef WIN32
107  if (i == SOCKET_ERROR)
108  {
109  i = -1;
110 
111  if (WSAGetLastError() == WSAEINTR)
112  errno = EINTR;
113  }
114 #endif
115 
116  if (i < 0 && errno == EINTR)
117  continue; /* ignore this */
118  if (i < 0 || CancelRequested)
119  return -1; /* but not this */
120  if (i == 0)
121  continue; /* timeout (Win32 only) */
122  break;
123  }
124 
125  return i;
126 }
volatile sig_atomic_t CancelRequested
Definition: cancel.c:59
#define EINTR
Definition: win32_port.h:382
#define select(n, r, w, e, timeout)
Definition: win32_port.h:505

References CancelRequested, EINTR, i, and select.

Referenced by wait_on_slots().

◆ TableCommandResultHandler()

bool TableCommandResultHandler ( PGresult res,
PGconn conn,
void *  context 
)

Definition at line 507 of file parallel_slot.c.

508 {
509  Assert(res != NULL);
510  Assert(conn != NULL);
511 
512  /*
513  * If it's an error, report it. Errors about a missing table are harmless
514  * so we continue processing; but die for other errors.
515  */
517  {
518  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
519 
520  pg_log_error("processing of database \"%s\" failed: %s",
522 
523  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
524  {
525  PQclear(res);
526  return false;
527  }
528  }
529 
530  return true;
531 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7245
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3244
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3299
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:97
#define pg_log_error(...)
Definition: logging.h:106
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:28
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:56

References Assert(), conn, ERRCODE_UNDEFINED_TABLE, PG_DIAG_SQLSTATE, pg_log_error, PGRES_COMMAND_OK, PQclear(), PQdb(), PQerrorMessage(), PQresultErrorField(), PQresultStatus(), and res.

Referenced by reindex_one_database(), and vacuum_one_database().

◆ wait_on_slots()

static bool wait_on_slots ( ParallelSlotArray sa)
static

Definition at line 196 of file parallel_slot.c.

197 {
198  int i;
199  fd_set slotset;
200  int maxFd = 0;
201  PGconn *cancelconn = NULL;
202 
203  /* We must reconstruct the fd_set for each call to select_loop */
204  FD_ZERO(&slotset);
205 
206  for (i = 0; i < sa->numslots; i++)
207  {
208  int sock;
209 
210  /* We shouldn't get here if we still have slots without connections */
211  Assert(sa->slots[i].connection != NULL);
212 
213  sock = PQsocket(sa->slots[i].connection);
214 
215  /*
216  * We don't really expect any connections to lose their sockets after
217  * startup, but just in case, cope by ignoring them.
218  */
219  if (sock < 0)
220  continue;
221 
222  /* Keep track of the first valid connection we see. */
223  if (cancelconn == NULL)
224  cancelconn = sa->slots[i].connection;
225 
226  FD_SET(sock, &slotset);
227  if (sock > maxFd)
228  maxFd = sock;
229  }
230 
231  /*
232  * If we get this far with no valid connections, processing cannot
233  * continue.
234  */
235  if (cancelconn == NULL)
236  return false;
237 
238  SetCancelConn(cancelconn);
239  i = select_loop(maxFd, &slotset);
240  ResetCancelConn();
241 
242  /* failure? */
243  if (i < 0)
244  return false;
245 
246  for (i = 0; i < sa->numslots; i++)
247  {
248  int sock;
249 
250  sock = PQsocket(sa->slots[i].connection);
251 
252  if (sock >= 0 && FD_ISSET(sock, &slotset))
253  {
254  /* select() says input is available, so consume it */
255  PQconsumeInput(sa->slots[i].connection);
256  }
257 
258  /* Collect result(s) as long as any are available */
259  while (!PQisBusy(sa->slots[i].connection))
260  {
261  PGresult *result = PQgetResult(sa->slots[i].connection);
262 
263  if (result != NULL)
264  {
265  /* Handle and discard the command result */
266  if (!processQueryResult(&sa->slots[i], result))
267  return false;
268  }
269  else
270  {
271  /* This connection has become idle */
272  sa->slots[i].inUse = false;
273  ParallelSlotClearHandler(&sa->slots[i]);
274  break;
275  }
276  }
277  }
278  return true;
279 }
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1957
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2004
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel_slot.c:80

References Assert(), i, ParallelSlotClearHandler(), PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), processQueryResult(), ResetCancelConn(), select_loop(), and SetCancelConn().

Referenced by ParallelSlotsGetIdle().