PostgreSQL Source Code  git master
scripts_parallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * scripts_parallel.c
4  * Parallel support for bin/scripts/
5  *
6  *
7  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/bin/scripts/scripts_parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #ifdef WIN32
16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
17 #endif
18 
19 #include "postgres_fe.h"
20 
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
24 
25 #include "common.h"
26 #include "common/logging.h"
27 #include "fe_utils/cancel.h"
28 #include "scripts_parallel.h"
29 
30 static void init_slot(ParallelSlot *slot, PGconn *conn);
31 static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
32 
33 static void
35 {
36  slot->connection = conn;
37  /* Initially assume connection is idle */
38  slot->isFree = true;
39 }
40 
41 /*
42  * Loop on select() until a descriptor from the given set becomes readable.
43  *
44  * If we get a cancel request while we're waiting, we forego all further
45  * processing and set the *aborting flag to true. The return value must be
46  * ignored in this case. Otherwise, *aborting is set to false.
47  */
48 static int
49 select_loop(int maxFd, fd_set *workerset, bool *aborting)
50 {
51  int i;
52  fd_set saveSet = *workerset;
53 
54  if (CancelRequested)
55  {
56  *aborting = true;
57  return -1;
58  }
59  else
60  *aborting = false;
61 
62  for (;;)
63  {
64  /*
65  * On Windows, we need to check once in a while for cancel requests;
66  * on other platforms we rely on select() returning when interrupted.
67  */
68  struct timeval *tvp;
69 #ifdef WIN32
70  struct timeval tv = {0, 1000000};
71 
72  tvp = &tv;
73 #else
74  tvp = NULL;
75 #endif
76 
77  *workerset = saveSet;
78  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
79 
80 #ifdef WIN32
81  if (i == SOCKET_ERROR)
82  {
83  i = -1;
84 
85  if (WSAGetLastError() == WSAEINTR)
86  errno = EINTR;
87  }
88 #endif
89 
90  if (i < 0 && errno == EINTR)
91  continue; /* ignore this */
92  if (i < 0 || CancelRequested)
93  *aborting = true; /* but not this */
94  if (i == 0)
95  continue; /* timeout (Win32 only) */
96  break;
97  }
98 
99  return i;
100 }
101 
102 /*
103  * ParallelSlotsGetIdle
104  * Return a connection slot that is ready to execute a command.
105  *
106  * This returns the first slot we find that is marked isFree, if one is;
107  * otherwise, we loop on select() until one socket becomes available. When
108  * this happens, we read the whole set and mark as free all sockets that
109  * become available. If an error occurs, NULL is returned.
110  */
111 ParallelSlot *
112 ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
113 {
114  int i;
115  int firstFree = -1;
116 
117  /*
118  * Look for any connection currently free. If there is one, mark it as
119  * taken and let the caller know the slot to use.
120  */
121  for (i = 0; i < numslots; i++)
122  {
123  if (slots[i].isFree)
124  {
125  slots[i].isFree = false;
126  return slots + i;
127  }
128  }
129 
130  /*
131  * No free slot found, so wait until one of the connections has finished
132  * its task and return the available slot.
133  */
134  while (firstFree < 0)
135  {
136  fd_set slotset;
137  int maxFd = 0;
138  bool aborting;
139 
140  /* We must reconstruct the fd_set for each call to select_loop */
141  FD_ZERO(&slotset);
142 
143  for (i = 0; i < numslots; i++)
144  {
145  int sock = PQsocket(slots[i].connection);
146 
147  /*
148  * We don't really expect any connections to lose their sockets
149  * after startup, but just in case, cope by ignoring them.
150  */
151  if (sock < 0)
152  continue;
153 
154  FD_SET(sock, &slotset);
155  if (sock > maxFd)
156  maxFd = sock;
157  }
158 
159  SetCancelConn(slots->connection);
160  i = select_loop(maxFd, &slotset, &aborting);
161  ResetCancelConn();
162 
163  if (aborting)
164  {
165  /*
166  * We set the cancel-receiving connection to the one in the zeroth
167  * slot above, so fetch the error from there.
168  */
170  return NULL;
171  }
172  Assert(i != 0);
173 
174  for (i = 0; i < numslots; i++)
175  {
176  int sock = PQsocket(slots[i].connection);
177 
178  if (sock >= 0 && FD_ISSET(sock, &slotset))
179  {
180  /* select() says input is available, so consume it */
181  PQconsumeInput(slots[i].connection);
182  }
183 
184  /* Collect result(s) as long as any are available */
185  while (!PQisBusy(slots[i].connection))
186  {
187  PGresult *result = PQgetResult(slots[i].connection);
188 
189  if (result != NULL)
190  {
191  /* Check and discard the command result */
192  if (!processQueryResult(slots[i].connection, result))
193  return NULL;
194  }
195  else
196  {
197  /* This connection has become idle */
198  slots[i].isFree = true;
199  if (firstFree < 0)
200  firstFree = i;
201  break;
202  }
203  }
204  }
205  }
206 
207  slots[firstFree].isFree = false;
208  return slots + firstFree;
209 }
210 
211 /*
212  * ParallelSlotsSetup
213  * Prepare a set of parallel slots to use on a given database.
214  *
215  * This creates and initializes a set of connections to the database
216  * using the information given by the caller, marking all parallel slots
217  * as free and ready to use. "conn" is an initial connection set up
218  * by the caller and is associated with the first slot in the parallel
219  * set.
220  */
221 ParallelSlot *
222 ParallelSlotsSetup(const char *dbname, const char *host, const char *port,
223  const char *username, bool prompt_password,
224  const char *progname, bool echo,
225  PGconn *conn, int numslots)
226 {
227  ParallelSlot *slots;
228  int i;
229 
230  Assert(conn != NULL);
231 
232  slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
233  init_slot(slots, conn);
234  if (numslots > 1)
235  {
236  for (i = 1; i < numslots; i++)
237  {
238  conn = connectDatabase(dbname, host, port, username, prompt_password,
239  progname, echo, false, true);
240 
241  /*
242  * Fail and exit immediately if trying to use a socket in an
243  * unsupported range. POSIX requires open(2) to use the lowest
244  * unused file descriptor and the hint given relies on that.
245  */
246  if (PQsocket(conn) >= FD_SETSIZE)
247  {
248  pg_log_fatal("too many jobs for this platform -- try %d", i);
249  exit(1);
250  }
251 
252  init_slot(slots + i, conn);
253  }
254  }
255 
256  return slots;
257 }
258 
259 /*
260  * ParallelSlotsTerminate
261  * Clean up a set of parallel slots
262  *
263  * Iterate through all connections in a given set of ParallelSlots and
264  * terminate all connections.
265  */
266 void
268 {
269  int i;
270 
271  for (i = 0; i < numslots; i++)
272  {
273  PGconn *conn = slots[i].connection;
274 
275  if (conn == NULL)
276  continue;
277 
278  disconnectDatabase(conn);
279  }
280 }
281 
282 /*
283  * ParallelSlotsWaitCompletion
284  *
285  * Wait for all connections to finish, returning false if at least one
286  * error has been found on the way.
287  */
288 bool
290 {
291  int i;
292 
293  for (i = 0; i < numslots; i++)
294  {
295  if (!consumeQueryResult((slots + i)->connection))
296  return false;
297  }
298 
299  return true;
300 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
bool processQueryResult(PGconn *conn, PGresult *result)
Definition: common.c:302
void ResetCancelConn(void)
Definition: cancel.c:89
static void init_slot(ParallelSlot *slot, PGconn *conn)
bool consumeQueryResult(PGconn *conn)
Definition: common.c:281
const char * progname
Definition: pg_standby.c:36
PGconn * conn
Definition: streamutil.c:54
bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
void SetCancelConn(PGconn *conn)
Definition: cancel.c:59
PGconn * connection
void ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
static int port
Definition: pg_regress.c:91
ParallelSlot * ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
void disconnectDatabase(PGconn *conn)
Definition: common.c:179
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
ParallelSlot * ParallelSlotsSetup(const char *dbname, const char *host, const char *port, const char *username, bool prompt_password, const char *progname, bool echo, PGconn *conn, int numslots)
static char * username
Definition: initdb.c:133
#define Assert(condition)
Definition: c.h:739
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1634
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
char * dbname
Definition: streamutil.c:50
bool CancelRequested
Definition: cancel.c:41
int i
#define EINTR
Definition: win32_port.h:323
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6641
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
static int select_loop(int maxFd, fd_set *workerset, bool *aborting)
#define pg_log_fatal(...)
Definition: logging.h:75