PostgreSQL Source Code  git master
scripts_parallel.c File Reference
#include "postgres_fe.h"
#include "common.h"
#include "common/logging.h"
#include "scripts_parallel.h"
Include dependency graph for scripts_parallel.c:

Go to the source code of this file.

Functions

static void init_slot (ParallelSlot *slot, PGconn *conn)
 
static int select_loop (int maxFd, fd_set *workerset, bool *aborting)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlot *slots, int numslots)
 
ParallelSlotParallelSlotsSetup (const char *dbname, const char *host, const char *port, const char *username, bool prompt_password, const char *progname, bool echo, PGconn *conn, int numslots)
 
void ParallelSlotsTerminate (ParallelSlot *slots, int numslots)
 
bool ParallelSlotsWaitCompletion (ParallelSlot *slots, int numslots)
 

Function Documentation

◆ init_slot()

static void init_slot ( ParallelSlot slot,
PGconn conn 
)
static

Definition at line 33 of file scripts_parallel.c.

References conn, ParallelSlot::connection, and ParallelSlot::isFree.

Referenced by ParallelSlotsSetup().

34 {
35  slot->connection = conn;
36  /* Initially assume connection is idle */
37  slot->isFree = true;
38 }
PGconn * conn
Definition: streamutil.c:54
PGconn * connection

◆ ParallelSlotsGetIdle()

ParallelSlot* ParallelSlotsGetIdle ( ParallelSlot slots,
int  numslots 
)

Definition at line 111 of file scripts_parallel.c.

References Assert, ParallelSlot::connection, consumeQueryResult(), i, ParallelSlot::isFree, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), processQueryResult(), ResetCancelConn(), select_loop(), and SetCancelConn().

Referenced by reindex_one_database(), and vacuum_one_database().

112 {
113  int i;
114  int firstFree = -1;
115 
116  /*
117  * Look for any connection currently free. If there is one, mark it as
118  * taken and let the caller know the slot to use.
119  */
120  for (i = 0; i < numslots; i++)
121  {
122  if (slots[i].isFree)
123  {
124  slots[i].isFree = false;
125  return slots + i;
126  }
127  }
128 
129  /*
130  * No free slot found, so wait until one of the connections has finished
131  * its task and return the available slot.
132  */
133  while (firstFree < 0)
134  {
135  fd_set slotset;
136  int maxFd = 0;
137  bool aborting;
138 
139  /* We must reconstruct the fd_set for each call to select_loop */
140  FD_ZERO(&slotset);
141 
142  for (i = 0; i < numslots; i++)
143  {
144  int sock = PQsocket(slots[i].connection);
145 
146  /*
147  * We don't really expect any connections to lose their sockets
148  * after startup, but just in case, cope by ignoring them.
149  */
150  if (sock < 0)
151  continue;
152 
153  FD_SET(sock, &slotset);
154  if (sock > maxFd)
155  maxFd = sock;
156  }
157 
158  SetCancelConn(slots->connection);
159  i = select_loop(maxFd, &slotset, &aborting);
160  ResetCancelConn();
161 
162  if (aborting)
163  {
164  /*
165  * We set the cancel-receiving connection to the one in the zeroth
166  * slot above, so fetch the error from there.
167  */
169  return NULL;
170  }
171  Assert(i != 0);
172 
173  for (i = 0; i < numslots; i++)
174  {
175  int sock = PQsocket(slots[i].connection);
176 
177  if (sock >= 0 && FD_ISSET(sock, &slotset))
178  {
179  /* select() says input is available, so consume it */
180  PQconsumeInput(slots[i].connection);
181  }
182 
183  /* Collect result(s) as long as any are available */
184  while (!PQisBusy(slots[i].connection))
185  {
186  PGresult *result = PQgetResult(slots[i].connection);
187 
188  if (result != NULL)
189  {
190  /* Check and discard the command result */
191  if (!processQueryResult(slots[i].connection, result))
192  return NULL;
193  }
194  else
195  {
196  /* This connection has become idle */
197  slots[i].isFree = true;
198  if (firstFree < 0)
199  firstFree = i;
200  break;
201  }
202  }
203  }
204  }
205 
206  slots[firstFree].isFree = false;
207  return slots + firstFree;
208 }
bool processQueryResult(PGconn *conn, PGresult *result)
Definition: common.c:309
bool consumeQueryResult(PGconn *conn)
Definition: common.c:288
PGconn * connection
void SetCancelConn(void)
Definition: common.c:437
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
#define Assert(condition)
Definition: c.h:733
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
int i
void ResetCancelConn(void)
Definition: common.c:467
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6635
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778
static int select_loop(int maxFd, fd_set *workerset, bool *aborting)

◆ ParallelSlotsSetup()

ParallelSlot* ParallelSlotsSetup ( const char *  dbname,
const char *  host,
const char *  port,
const char *  username,
bool  prompt_password,
const char *  progname,
bool  echo,
PGconn conn,
int  numslots 
)

Definition at line 221 of file scripts_parallel.c.

References Assert, connectDatabase(), i, init_slot(), pg_log_fatal, pg_malloc(), and PQsocket().

Referenced by reindex_one_database(), and vacuum_one_database().

225 {
226  ParallelSlot *slots;
227  int i;
228 
229  Assert(conn != NULL);
230 
231  slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
232  init_slot(slots, conn);
233  if (numslots > 1)
234  {
235  for (i = 1; i < numslots; i++)
236  {
237  conn = connectDatabase(dbname, host, port, username, prompt_password,
238  progname, echo, false, true);
239 
240  /*
241  * Fail and exit immediately if trying to use a socket in an
242  * unsupported range. POSIX requires open(2) to use the lowest
243  * unused file descriptor and the hint given relies on that.
244  */
245  if (PQsocket(conn) >= FD_SETSIZE)
246  {
247  pg_log_fatal("too many jobs for this platform -- try %d", i);
248  exit(1);
249  }
250 
251  init_slot(slots + i, conn);
252  }
253  }
254 
255  return slots;
256 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static void init_slot(ParallelSlot *slot, PGconn *conn)
const char * progname
Definition: pg_standby.c:36
static int port
Definition: pg_regress.c:91
static char * username
Definition: initdb.c:133
#define Assert(condition)
Definition: c.h:733
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1634
char * dbname
Definition: streamutil.c:50
int i
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6635
#define pg_log_fatal(...)
Definition: logging.h:75

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlot slots,
int  numslots 
)

Definition at line 266 of file scripts_parallel.c.

References conn, ParallelSlot::connection, disconnectDatabase(), and i.

Referenced by reindex_one_database(), and vacuum_one_database().

267 {
268  int i;
269 
270  for (i = 0; i < numslots; i++)
271  {
272  PGconn *conn = slots[i].connection;
273 
274  if (conn == NULL)
275  continue;
276 
277  disconnectDatabase(conn);
278  }
279 }
PGconn * conn
Definition: streamutil.c:54
PGconn * connection
void disconnectDatabase(PGconn *conn)
Definition: common.c:186
int i

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlot slots,
int  numslots 
)

Definition at line 288 of file scripts_parallel.c.

References consumeQueryResult(), and i.

Referenced by reindex_one_database(), and vacuum_one_database().

289 {
290  int i;
291 
292  for (i = 0; i < numslots; i++)
293  {
294  if (!consumeQueryResult((slots + i)->connection))
295  return false;
296  }
297 
298  return true;
299 }
bool consumeQueryResult(PGconn *conn)
Definition: common.c:288
int i

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset,
bool aborting 
)
static

Definition at line 48 of file scripts_parallel.c.

References CancelRequested, EINTR, i, and select.

Referenced by ParallelSlotsGetIdle().

49 {
50  int i;
51  fd_set saveSet = *workerset;
52 
53  if (CancelRequested)
54  {
55  *aborting = true;
56  return -1;
57  }
58  else
59  *aborting = false;
60 
61  for (;;)
62  {
63  /*
64  * On Windows, we need to check once in a while for cancel requests;
65  * on other platforms we rely on select() returning when interrupted.
66  */
67  struct timeval *tvp;
68 #ifdef WIN32
69  struct timeval tv = {0, 1000000};
70 
71  tvp = &tv;
72 #else
73  tvp = NULL;
74 #endif
75 
76  *workerset = saveSet;
77  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
78 
79 #ifdef WIN32
80  if (i == SOCKET_ERROR)
81  {
82  i = -1;
83 
84  if (WSAGetLastError() == WSAEINTR)
85  errno = EINTR;
86  }
87 #endif
88 
89  if (i < 0 && errno == EINTR)
90  continue; /* ignore this */
91  if (i < 0 || CancelRequested)
92  *aborting = true; /* but not this */
93  if (i == 0)
94  continue; /* timeout (Win32 only) */
95  break;
96  }
97 
98  return i;
99 }
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
bool CancelRequested
Definition: common.c:29
int i
#define EINTR
Definition: win32_port.h:323