PostgreSQL Source Code  git master
scripts_parallel.c File Reference
#include "postgres_fe.h"
#include "common.h"
#include "common/logging.h"
#include "fe_utils/cancel.h"
#include "scripts_parallel.h"
Include dependency graph for scripts_parallel.c:

Go to the source code of this file.

Functions

static void init_slot (ParallelSlot *slot, PGconn *conn)
 
static int select_loop (int maxFd, fd_set *workerset)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlot *slots, int numslots)
 
ParallelSlotParallelSlotsSetup (const char *dbname, const char *host, const char *port, const char *username, bool prompt_password, const char *progname, bool echo, PGconn *conn, int numslots)
 
void ParallelSlotsTerminate (ParallelSlot *slots, int numslots)
 
bool ParallelSlotsWaitCompletion (ParallelSlot *slots, int numslots)
 

Function Documentation

◆ init_slot()

static void init_slot ( ParallelSlot slot,
PGconn conn 
)
static

Definition at line 34 of file scripts_parallel.c.

References conn, ParallelSlot::connection, and ParallelSlot::isFree.

Referenced by ParallelSlotsSetup().

35 {
36  slot->connection = conn;
37  /* Initially assume connection is idle */
38  slot->isFree = true;
39 }
PGconn * conn
Definition: streamutil.c:54
PGconn * connection

◆ ParallelSlotsGetIdle()

ParallelSlot* ParallelSlotsGetIdle ( ParallelSlot slots,
int  numslots 
)

Definition at line 106 of file scripts_parallel.c.

References ParallelSlot::connection, i, ParallelSlot::isFree, PQconsumeInput(), PQgetResult(), PQisBusy(), PQsocket(), processQueryResult(), ResetCancelConn(), select_loop(), and SetCancelConn().

Referenced by reindex_one_database(), and vacuum_one_database().

107 {
108  int i;
109  int firstFree = -1;
110 
111  /*
112  * Look for any connection currently free. If there is one, mark it as
113  * taken and let the caller know the slot to use.
114  */
115  for (i = 0; i < numslots; i++)
116  {
117  if (slots[i].isFree)
118  {
119  slots[i].isFree = false;
120  return slots + i;
121  }
122  }
123 
124  /*
125  * No free slot found, so wait until one of the connections has finished
126  * its task and return the available slot.
127  */
128  while (firstFree < 0)
129  {
130  fd_set slotset;
131  int maxFd = 0;
132 
133  /* We must reconstruct the fd_set for each call to select_loop */
134  FD_ZERO(&slotset);
135 
136  for (i = 0; i < numslots; i++)
137  {
138  int sock = PQsocket(slots[i].connection);
139 
140  /*
141  * We don't really expect any connections to lose their sockets
142  * after startup, but just in case, cope by ignoring them.
143  */
144  if (sock < 0)
145  continue;
146 
147  FD_SET(sock, &slotset);
148  if (sock > maxFd)
149  maxFd = sock;
150  }
151 
152  SetCancelConn(slots->connection);
153  i = select_loop(maxFd, &slotset);
154  ResetCancelConn();
155 
156  /* failure? */
157  if (i < 0)
158  return NULL;
159 
160  for (i = 0; i < numslots; i++)
161  {
162  int sock = PQsocket(slots[i].connection);
163 
164  if (sock >= 0 && FD_ISSET(sock, &slotset))
165  {
166  /* select() says input is available, so consume it */
167  PQconsumeInput(slots[i].connection);
168  }
169 
170  /* Collect result(s) as long as any are available */
171  while (!PQisBusy(slots[i].connection))
172  {
173  PGresult *result = PQgetResult(slots[i].connection);
174 
175  if (result != NULL)
176  {
177  /* Check and discard the command result */
178  if (!processQueryResult(slots[i].connection, result))
179  return NULL;
180  }
181  else
182  {
183  /* This connection has become idle */
184  slots[i].isFree = true;
185  if (firstFree < 0)
186  firstFree = i;
187  break;
188  }
189  }
190  }
191  }
192 
193  slots[firstFree].isFree = false;
194  return slots + firstFree;
195 }
bool processQueryResult(PGconn *conn, PGresult *result)
Definition: common.c:302
void ResetCancelConn(void)
Definition: cancel.c:100
void SetCancelConn(PGconn *conn)
Definition: cancel.c:70
PGconn * connection
static int select_loop(int maxFd, fd_set *workerset)
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1704
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:1754
int i
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6690
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:1778

◆ ParallelSlotsSetup()

ParallelSlot* ParallelSlotsSetup ( const char *  dbname,
const char *  host,
const char *  port,
const char *  username,
bool  prompt_password,
const char *  progname,
bool  echo,
PGconn conn,
int  numslots 
)

Definition at line 208 of file scripts_parallel.c.

References Assert, connectDatabase(), i, init_slot(), pg_log_fatal, pg_malloc(), and PQsocket().

Referenced by reindex_one_database(), and vacuum_one_database().

212 {
213  ParallelSlot *slots;
214  int i;
215 
216  Assert(conn != NULL);
217 
218  slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
219  init_slot(slots, conn);
220  if (numslots > 1)
221  {
222  for (i = 1; i < numslots; i++)
223  {
224  conn = connectDatabase(dbname, host, port, username, prompt_password,
225  progname, echo, false, true);
226 
227  /*
228  * Fail and exit immediately if trying to use a socket in an
229  * unsupported range. POSIX requires open(2) to use the lowest
230  * unused file descriptor and the hint given relies on that.
231  */
232  if (PQsocket(conn) >= FD_SETSIZE)
233  {
234  pg_log_fatal("too many jobs for this platform -- try %d", i);
235  exit(1);
236  }
237 
238  init_slot(slots + i, conn);
239  }
240  }
241 
242  return slots;
243 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static void init_slot(ParallelSlot *slot, PGconn *conn)
const char * progname
Definition: pg_standby.c:36
static int port
Definition: pg_regress.c:90
static char * username
Definition: initdb.c:133
#define Assert(condition)
Definition: c.h:745
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
Definition: pg_dumpall.c:1635
char * dbname
Definition: streamutil.c:50
int i
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:6690
#define pg_log_fatal(...)
Definition: logging.h:75

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlot slots,
int  numslots 
)

Definition at line 253 of file scripts_parallel.c.

References conn, ParallelSlot::connection, disconnectDatabase(), and i.

Referenced by reindex_one_database(), and vacuum_one_database().

254 {
255  int i;
256 
257  for (i = 0; i < numslots; i++)
258  {
259  PGconn *conn = slots[i].connection;
260 
261  if (conn == NULL)
262  continue;
263 
264  disconnectDatabase(conn);
265  }
266 }
PGconn * conn
Definition: streamutil.c:54
PGconn * connection
void disconnectDatabase(PGconn *conn)
Definition: common.c:179
int i

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlot slots,
int  numslots 
)

Definition at line 275 of file scripts_parallel.c.

References consumeQueryResult(), and i.

Referenced by reindex_one_database(), and vacuum_one_database().

276 {
277  int i;
278 
279  for (i = 0; i < numslots; i++)
280  {
281  if (!consumeQueryResult((slots + i)->connection))
282  return false;
283  }
284 
285  return true;
286 }
bool consumeQueryResult(PGconn *conn)
Definition: common.c:281
int i

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset 
)
static

Definition at line 48 of file scripts_parallel.c.

References CancelRequested, EINTR, i, and select.

Referenced by ParallelSlotsGetIdle().

49 {
50  int i;
51  fd_set saveSet = *workerset;
52 
53  if (CancelRequested)
54  return -1;
55 
56  for (;;)
57  {
58  /*
59  * On Windows, we need to check once in a while for cancel requests;
60  * on other platforms we rely on select() returning when interrupted.
61  */
62  struct timeval *tvp;
63 #ifdef WIN32
64  struct timeval tv = {0, 1000000};
65 
66  tvp = &tv;
67 #else
68  tvp = NULL;
69 #endif
70 
71  *workerset = saveSet;
72  i = select(maxFd + 1, workerset, NULL, NULL, tvp);
73 
74 #ifdef WIN32
75  if (i == SOCKET_ERROR)
76  {
77  i = -1;
78 
79  if (WSAGetLastError() == WSAEINTR)
80  errno = EINTR;
81  }
82 #endif
83 
84  if (i < 0 && errno == EINTR)
85  continue; /* ignore this */
86  if (i < 0 || CancelRequested)
87  return -1; /* but not this */
88  if (i == 0)
89  continue; /* timeout (Win32 only) */
90  break;
91  }
92 
93  return i;
94 }
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
volatile sig_atomic_t CancelRequested
Definition: cancel.c:52
int i
#define EINTR
Definition: win32_port.h:323