PostgreSQL Source Code  git master
scripts_parallel.c File Reference
#include "postgres_fe.h"
#include "common.h"
#include "common/logging.h"
#include "fe_utils/cancel.h"
#include "scripts_parallel.h"
Include dependency graph for scripts_parallel.c:

Go to the source code of this file.

Functions

static void init_slot (ParallelSlot *slot, PGconn *conn)
 
static int select_loop (int maxFd, fd_set *workerset, bool *aborting)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlot *slots, int numslots)
 
ParallelSlotParallelSlotsSetup (const char *dbname, const char *host, const char *port, const char *username, bool prompt_password, const char *progname, bool echo, PGconn *conn, int numslots)
 
void ParallelSlotsTerminate (ParallelSlot *slots, int numslots)
 
bool ParallelSlotsWaitCompletion (ParallelSlot *slots, int numslots)
 

Function Documentation

◆ init_slot()

static void init_slot ( ParallelSlot slot,
PGconn conn 
)
static

Definition at line 34 of file scripts_parallel.c.

References conn, ParallelSlot::connection, and ParallelSlot::isFree.

Referenced by ParallelSlotsSetup().

35 {
36  slot->connection = conn;
37  /* Initially assume connection is idle */
38  slot->isFree = true;
39 }
PGconn * conn
Definition: streamutil.c:54
PGconn * connection

◆ ParallelSlotsGetIdle()

ParallelSlot* ParallelSlotsGetIdle ( ParallelSlot slots,
int  numslots 
)

Definition at line 112 of file scripts_parallel.c.

References Assert, ParallelSlot::connection, consumeQueryResult(), i, ParallelSlot::isFree, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), processQueryResult(), ResetCancelConn(), select_loop(), and SetCancelConn().

Referenced by reindex_one_database(), and vacuum_one_database().

113 {
114  int i;
115  int firstFree = -1;
116 
117  /*
118  * Look for any connection currently free. If there is one, mark it as
119  * taken and let the caller know the slot to use.
120  */
121  for (i = 0; i < numslots; i++)
122  {
123  if (slots[i].isFree)
124  {
125  slots[i].isFree = false;
126  return slots + i;
127  }
128  }
129 
130  /*
131  * No free slot found, so wait until one of the connections has finished
132  * its task and return the available slot.
133  */
134  while (firstFree < 0)
135  {
136  fd_set slotset;
137  int maxFd = 0;
138  bool aborting;
139 
140  /* We must reconstruct the fd_set for each call to select_loop */
141  FD_ZERO(&slotset);
142 
143  for (i = 0; i < numslots; i++)
144  {
145  int sock = PQsocket(slots[i].connection);
146 
147  /*
148  * We don't really expect any connections to lose their sockets
149  * after startup, but just in case, cope by ignoring them.
150  */
151  if (sock < 0)
152  continue;
153 
154  FD_SET(sock, &slotset);
155  if (sock > maxFd)
156  maxFd = sock;
157  }
158 
159  SetCancelConn(slots->connection);
160  i = select_loop(maxFd, &slotset, &aborting);
161  ResetCancelConn();
162 
163  if (aborting)
164  {
165  /*
166  * We set the cancel-receiving connection to the one in the zeroth
167  * slot above, so fetch the error from there.
168  */
170  return NULL;
171  }
172  Assert(i != 0);
173 
174  for (i = 0; i < numslots; i++)
175  {
176  int sock = PQsocket(slots[i].connection);
177 
178  if (sock >= 0 && FD_ISSET(sock, &slotset))
179  {
180  /* select() says input is available, so consume it */
181  PQconsumeInput(slots[i].connection);
182  }
183 
184  /* Collect result(s) as long as any are available */
185  while (!PQisBusy(slots[i].connection))
186  {
187  PGresult *result = PQgetResult(slots[i].connection);
188 
189  if (result != NULL)
190  {
191  /* Check and discard the command result */
192  if (!processQueryResult(slots[i].connection, result))
193  return NULL;
194  }
195  else
196  {
197  /* This connection has become idle */
198  slots[i].isFree = true;
199  if (firstFree < 0)
200  firstFree = i;
201  break;
202  }
203  }
204  }
205  }
206 
207  slots[firstFree].isFree = false;
208  return slots + firstFree;
209 }
bool processQueryResult(PGconn *conn, PGresult *result)
Definition: common.c:302
void ResetCancelConn(void)
Definition: cancel.c:100
bool consumeQueryResult(PGconn *conn)
Definition: common.c:281
void SetCancelConn(PGconn *conn)
Definition: cancel.c:70
PGconn * connection
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
#define Assert(condition)
Definition: c.h:739
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
int i
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6649
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
static int select_loop(int maxFd, fd_set *workerset, bool *aborting)

◆ ParallelSlotsSetup()

ParallelSlot* ParallelSlotsSetup ( const char *  dbname,
const char *  host,
const char *  port,
const char *  username,
bool  prompt_password,
const char *  progname,
bool  echo,
PGconn conn,
int  numslots 
)

Definition at line 222 of file scripts_parallel.c.

References Assert, connectDatabase(), i, init_slot(), pg_log_fatal, pg_malloc(), and PQsocket().

Referenced by reindex_one_database(), and vacuum_one_database().

226 {
227  ParallelSlot *slots;
228  int i;
229 
230  Assert(conn != NULL);
231 
232  slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
233  init_slot(slots, conn);
234  if (numslots > 1)
235  {
236  for (i = 1; i < numslots; i++)
237  {
238  conn = connectDatabase(dbname, host, port, username, prompt_password,
239  progname, echo, false, true);
240 
241  /*
242  * Fail and exit immediately if trying to use a socket in an
243  * unsupported range. POSIX requires open(2) to use the lowest
244  * unused file descriptor and the hint given relies on that.
245  */
246  if (PQsocket(conn) >= FD_SETSIZE)
247  {
248  pg_log_fatal("too many jobs for this platform -- try %d", i);
249  exit(1);
250  }
251 
252  init_slot(slots + i, conn);
253  }
254  }
255 
256  return slots;
257 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static void init_slot(ParallelSlot *slot, PGconn *conn)
const char * progname
Definition: pg_standby.c:36
static int port
Definition: pg_regress.c:91
static char * username
Definition: initdb.c:133
#define Assert(condition)
Definition: c.h:739
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1634
char * dbname
Definition: streamutil.c:50
int i
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6649
#define pg_log_fatal(...)
Definition: logging.h:75

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlot slots,
int  numslots 
)

Definition at line 267 of file scripts_parallel.c.

References conn, ParallelSlot::connection, disconnectDatabase(), and i.

Referenced by reindex_one_database(), and vacuum_one_database().

268 {
269  int i;
270 
271  for (i = 0; i < numslots; i++)
272  {
273  PGconn *conn = slots[i].connection;
274 
275  if (conn == NULL)
276  continue;
277 
278  disconnectDatabase(conn);
279  }
280 }
PGconn * conn
Definition: streamutil.c:54
PGconn * connection
void disconnectDatabase(PGconn *conn)
Definition: common.c:179
int i

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlot slots,
int  numslots 
)

Definition at line 289 of file scripts_parallel.c.

References consumeQueryResult(), and i.

Referenced by reindex_one_database(), and vacuum_one_database().

290 {
291  int i;
292 
293  for (i = 0; i < numslots; i++)
294  {
295  if (!consumeQueryResult((slots + i)->connection))
296  return false;
297  }
298 
299  return true;
300 }
bool consumeQueryResult(PGconn *conn)
Definition: common.c:281
int i

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset,
bool aborting 
)
static

Definition at line 49 of file scripts_parallel.c.

References CancelRequested, EINTR, i, and select.

Referenced by ParallelSlotsGetIdle().

50 {
51  int i;
52  fd_set saveSet = *workerset;
53 
54  if (CancelRequested)
55  {
56  *aborting = true;
57  return -1;
58  }
59  else
60  *aborting = false;
61 
62  for (;;)
63  {
64  /*
65  * On Windows, we need to check once in a while for cancel requests;
66  * on other platforms we rely on select() returning when interrupted.
67  */
68  struct timeval *tvp;
69 #ifdef WIN32
70  struct timeval tv = {0, 1000000};
71 
72  tvp = &tv;
73 #else
74  tvp = NULL;
75 #endif
76 
77  *workerset = saveSet;
78  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
79 
80 #ifdef WIN32
81  if (i == SOCKET_ERROR)
82  {
83  i = -1;
84 
85  if (WSAGetLastError() == WSAEINTR)
86  errno = EINTR;
87  }
88 #endif
89 
90  if (i < 0 && errno == EINTR)
91  continue; /* ignore this */
92  if (i < 0 || CancelRequested)
93  *aborting = true; /* but not this */
94  if (i == 0)
95  continue; /* timeout (Win32 only) */
96  break;
97  }
98 
99  return i;
100 }
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
volatile sig_atomic_t CancelRequested
Definition: cancel.c:52
int i
#define EINTR
Definition: win32_port.h:323