PostgreSQL Source Code  git master
scripts_parallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * scripts_parallel.c
4  * Parallel support for bin/scripts/
5  *
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * src/bin/scripts/scripts_parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #ifdef WIN32
16 #define FD_SETSIZE 1024 /* must set before winsock2.h is included */
17 #endif
18 
19 #include "postgres_fe.h"
20 
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
23 #endif
24 
25 #include "common.h"
26 #include "common/logging.h"
27 #include "fe_utils/cancel.h"
28 #include "scripts_parallel.h"
29 
30 static void init_slot(ParallelSlot *slot, PGconn *conn);
31 static int select_loop(int maxFd, fd_set *workerset);
32 
33 static void
35 {
36  slot->connection = conn;
37  /* Initially assume connection is idle */
38  slot->isFree = true;
39 }
40 
41 /*
42  * Wait until a file descriptor from the given set becomes readable.
43  *
44  * Returns the number of ready descriptors, or -1 on failure (including
45  * getting a cancel request).
46  */
47 static int
48 select_loop(int maxFd, fd_set *workerset)
49 {
50  int i;
51  fd_set saveSet = *workerset;
52 
53  if (CancelRequested)
54  return -1;
55 
56  for (;;)
57  {
58  /*
59  * On Windows, we need to check once in a while for cancel requests;
60  * on other platforms we rely on select() returning when interrupted.
61  */
62  struct timeval *tvp;
63 #ifdef WIN32
64  struct timeval tv = {0, 1000000};
65 
66  tvp = &tv;
67 #else
68  tvp = NULL;
69 #endif
70 
71  *workerset = saveSet;
72  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
73 
74 #ifdef WIN32
75  if (i == SOCKET_ERROR)
76  {
77  i = -1;
78 
79  if (WSAGetLastError() == WSAEINTR)
80  errno = EINTR;
81  }
82 #endif
83 
84  if (i < 0 && errno == EINTR)
85  continue; /* ignore this */
86  if (i < 0 || CancelRequested)
87  return -1; /* but not this */
88  if (i == 0)
89  continue; /* timeout (Win32 only) */
90  break;
91  }
92 
93  return i;
94 }
95 
96 /*
97  * ParallelSlotsGetIdle
98  * Return a connection slot that is ready to execute a command.
99  *
100  * This returns the first slot we find that is marked isFree, if one is;
101  * otherwise, we loop on select() until one socket becomes available. When
102  * this happens, we read the whole set and mark as free all sockets that
103  * become available. If an error occurs, NULL is returned.
104  */
105 ParallelSlot *
106 ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
107 {
108  int i;
109  int firstFree = -1;
110 
111  /*
112  * Look for any connection currently free. If there is one, mark it as
113  * taken and let the caller know the slot to use.
114  */
115  for (i = 0; i < numslots; i++)
116  {
117  if (slots[i].isFree)
118  {
119  slots[i].isFree = false;
120  return slots + i;
121  }
122  }
123 
124  /*
125  * No free slot found, so wait until one of the connections has finished
126  * its task and return the available slot.
127  */
128  while (firstFree < 0)
129  {
130  fd_set slotset;
131  int maxFd = 0;
132 
133  /* We must reconstruct the fd_set for each call to select_loop */
134  FD_ZERO(&slotset);
135 
136  for (i = 0; i < numslots; i++)
137  {
138  int sock = PQsocket(slots[i].connection);
139 
140  /*
141  * We don't really expect any connections to lose their sockets
142  * after startup, but just in case, cope by ignoring them.
143  */
144  if (sock < 0)
145  continue;
146 
147  FD_SET(sock, &slotset);
148  if (sock > maxFd)
149  maxFd = sock;
150  }
151 
152  SetCancelConn(slots->connection);
153  i = select_loop(maxFd, &slotset);
154  ResetCancelConn();
155 
156  /* failure? */
157  if (i < 0)
158  return NULL;
159 
160  for (i = 0; i < numslots; i++)
161  {
162  int sock = PQsocket(slots[i].connection);
163 
164  if (sock >= 0 && FD_ISSET(sock, &slotset))
165  {
166  /* select() says input is available, so consume it */
167  PQconsumeInput(slots[i].connection);
168  }
169 
170  /* Collect result(s) as long as any are available */
171  while (!PQisBusy(slots[i].connection))
172  {
173  PGresult *result = PQgetResult(slots[i].connection);
174 
175  if (result != NULL)
176  {
177  /* Check and discard the command result */
178  if (!processQueryResult(slots[i].connection, result))
179  return NULL;
180  }
181  else
182  {
183  /* This connection has become idle */
184  slots[i].isFree = true;
185  if (firstFree < 0)
186  firstFree = i;
187  break;
188  }
189  }
190  }
191  }
192 
193  slots[firstFree].isFree = false;
194  return slots + firstFree;
195 }
196 
197 /*
198  * ParallelSlotsSetup
199  * Prepare a set of parallel slots to use on a given database.
200  *
201  * This creates and initializes a set of connections to the database
202  * using the information given by the caller, marking all parallel slots
203  * as free and ready to use. "conn" is an initial connection set up
204  * by the caller and is associated with the first slot in the parallel
205  * set.
206  */
207 ParallelSlot *
208 ParallelSlotsSetup(const char *dbname, const char *host, const char *port,
209  const char *username, bool prompt_password,
210  const char *progname, bool echo,
211  PGconn *conn, int numslots)
212 {
213  ParallelSlot *slots;
214  int i;
215 
216  Assert(conn != NULL);
217 
218  slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
219  init_slot(slots, conn);
220  if (numslots > 1)
221  {
222  for (i = 1; i < numslots; i++)
223  {
224  conn = connectDatabase(dbname, host, port, username, prompt_password,
225  progname, echo, false, true);
226 
227  /*
228  * Fail and exit immediately if trying to use a socket in an
229  * unsupported range. POSIX requires open(2) to use the lowest
230  * unused file descriptor and the hint given relies on that.
231  */
232  if (PQsocket(conn) >= FD_SETSIZE)
233  {
234  pg_log_fatal("too many jobs for this platform -- try %d", i);
235  exit(1);
236  }
237 
238  init_slot(slots + i, conn);
239  }
240  }
241 
242  return slots;
243 }
244 
245 /*
246  * ParallelSlotsTerminate
247  * Clean up a set of parallel slots
248  *
249  * Iterate through all connections in a given set of ParallelSlots and
250  * terminate all connections.
251  */
252 void
254 {
255  int i;
256 
257  for (i = 0; i < numslots; i++)
258  {
259  PGconn *conn = slots[i].connection;
260 
261  if (conn == NULL)
262  continue;
263 
264  disconnectDatabase(conn);
265  }
266 }
267 
268 /*
269  * ParallelSlotsWaitCompletion
270  *
271  * Wait for all connections to finish, returning false if at least one
272  * error has been found on the way.
273  */
274 bool
276 {
277  int i;
278 
279  for (i = 0; i < numslots; i++)
280  {
281  if (!consumeQueryResult((slots + i)->connection))
282  return false;
283  }
284 
285  return true;
286 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
bool processQueryResult(PGconn *conn, PGresult *result)
Definition: common.c:302
void ResetCancelConn(void)
Definition: cancel.c:100
static void init_slot(ParallelSlot *slot, PGconn *conn)
bool consumeQueryResult(PGconn *conn)
Definition: common.c:281
const char * progname
Definition: pg_standby.c:36
PGconn * conn
Definition: streamutil.c:54
bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
void SetCancelConn(PGconn *conn)
Definition: cancel.c:70
PGconn * connection
static int select_loop(int maxFd, fd_set *workerset)
void ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
static int port
Definition: pg_regress.c:90
ParallelSlot * ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
void disconnectDatabase(PGconn *conn)
Definition: common.c:179
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
ParallelSlot * ParallelSlotsSetup(const char *dbname, const char *host, const char *port, const char *username, bool prompt_password, const char *progname, bool echo, PGconn *conn, int numslots)
static char * username
Definition: initdb.c:133
#define Assert(condition)
Definition: c.h:745
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1635
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
char * dbname
Definition: streamutil.c:50
volatile sig_atomic_t CancelRequested
Definition: cancel.c:52
int i
#define EINTR
Definition: win32_port.h:323
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6690
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
#define pg_log_fatal(...)
Definition: logging.h:75