15 #if defined(WIN32) && FD_SETSIZE < 1024
16 #error FD_SETSIZE needs to have been increased
28 #define ERRCODE_UNDEFINED_TABLE "42P01"
30 static int select_loop(
int maxFd, fd_set *workerset);
83 fd_set saveSet = *workerset;
96 struct timeval tv = {0, 1000000};
103 *workerset = saveSet;
104 i =
select(maxFd + 1, workerset, NULL, NULL, tvp);
107 if (
i == SOCKET_ERROR)
111 if (WSAGetLastError() == WSAEINTR)
116 if (
i < 0 && errno ==
EINTR)
139 for (
i = 0;
i <
sa->numslots;
i++)
141 if (
sa->slots[
i].inUse)
144 if (
sa->slots[
i].connection == NULL)
163 for (
i = 0;
i <
sa->numslots;
i++)
165 if (
sa->slots[
i].inUse)
168 if (
sa->slots[
i].connection == NULL)
183 for (
i = 0;
i <
sa->numslots;
i++)
184 if (!
sa->slots[
i].inUse)
201 PGconn *cancelconn = NULL;
206 for (
i = 0;
i <
sa->numslots;
i++)
223 if (cancelconn == NULL)
224 cancelconn =
sa->slots[
i].connection;
226 FD_SET(sock, &slotset);
235 if (cancelconn == NULL)
246 for (
i = 0;
i <
sa->numslots;
i++)
252 if (sock >= 0 && FD_ISSET(sock, &slotset))
272 sa->slots[
i].inUse =
false;
289 const char *old_override;
292 old_override =
sa->cparams->override_dbname;
294 sa->cparams->override_dbname =
dbname;
296 sa->cparams->override_dbname = old_override;
315 if (slotno >= FD_SETSIZE)
317 pg_log_error(
"too many jobs for this platform: %d", slotno);
324 if (
fd >= FD_SETSIZE)
326 pg_log_error(
"socket file descriptor out of range for select(): %d",
384 sa->slots[offset].inUse =
true;
385 return &
sa->slots[offset];
393 sa->slots[offset].inUse =
true;
394 return &
sa->slots[offset];
402 sa->slots[offset].connection = NULL;
404 sa->slots[offset].inUse =
true;
405 return &
sa->slots[offset];
429 bool echo,
const char *initcmd)
440 sa->numslots = numslots;
441 sa->cparams = cparams;
444 sa->initcmd = initcmd;
466 sa->slots[offset].connection =
conn;
483 for (
i = 0;
i <
sa->numslots;
i++)
505 for (
i = 0;
i <
sa->numslots;
i++)
507 if (
sa->slots[
i].connection == NULL)
512 sa->slots[
i].inUse =
false;
553 pg_log_error(
"processing of database \"%s\" failed: %s",
struct ParallelSlot ParallelSlot
#define Assert(condition)
volatile sig_atomic_t CancelRequested
void ResetCancelConn(void)
void SetCancelConn(PGconn *conn)
void disconnectDatabase(PGconn *conn)
static void PGresult * res
char * PQdb(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
int PQconsumeInput(PGconn *conn)
int PQisBusy(PGconn *conn)
char * PQresultErrorField(const PGresult *res, int fieldcode)
PGresult * PQgetResult(PGconn *conn)
#define pg_log_error(...)
#define pg_log_error_hint(...)
void * palloc0(Size size)
static bool wait_on_slots(ParallelSlotArray *sa)
ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
static int select_loop(int maxFd, fd_set *workerset)
bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
#define ERRCODE_UNDEFINED_TABLE
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static bool consumeQueryResult(ParallelSlot *slot)
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
void ParallelSlotsTerminate(ParallelSlotArray *sa)
static int find_unconnected_slot(const ParallelSlotArray *sa)
void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
static int find_any_idle_slot(const ParallelSlotArray *sa)
static void ParallelSlotClearHandler(ParallelSlot *slot)
static PGconn * connectDatabase(const char *dbname, const char *connection_string, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
static void executeCommand(PGconn *conn, const char *query)
static int fd(const char *x, int i)
ParallelSlotResultHandler handler
#define select(n, r, w, e, timeout)