16 #define FD_SETSIZE 1024
21 #ifdef HAVE_SYS_SELECT_H
22 #include <sys/select.h>
30 #define ERRCODE_UNDEFINED_TABLE "42P01"
32 static int select_loop(
int maxFd, fd_set *workerset);
85 fd_set saveSet = *workerset;
98 struct timeval tv = {0, 1000000};
105 *workerset = saveSet;
106 i =
select(maxFd + 1, workerset, NULL, NULL, tvp);
109 if (
i == SOCKET_ERROR)
113 if (WSAGetLastError() == WSAEINTR)
118 if (
i < 0 && errno ==
EINTR)
141 for (
i = 0;
i <
sa->numslots;
i++)
143 if (
sa->slots[
i].inUse)
146 if (
sa->slots[
i].connection == NULL)
165 for (
i = 0;
i <
sa->numslots;
i++)
167 if (
sa->slots[
i].inUse)
170 if (
sa->slots[
i].connection == NULL)
185 for (
i = 0;
i <
sa->numslots;
i++)
186 if (!
sa->slots[
i].inUse)
203 PGconn *cancelconn = NULL;
208 for (
i = 0;
i <
sa->numslots;
i++)
225 if (cancelconn == NULL)
226 cancelconn =
sa->slots[
i].connection;
228 FD_SET(sock, &slotset);
237 if (cancelconn == NULL)
248 for (
i = 0;
i <
sa->numslots;
i++)
254 if (sock >= 0 && FD_ISSET(sock, &slotset))
274 sa->slots[
i].inUse =
false;
291 const char *old_override;
294 old_override =
sa->cparams->override_dbname;
296 sa->cparams->override_dbname =
dbname;
298 sa->cparams->override_dbname = old_override;
301 pg_fatal(
"too many jobs for this platform");
353 sa->slots[offset].inUse =
true;
354 return &
sa->slots[offset];
362 sa->slots[offset].inUse =
true;
363 return &
sa->slots[offset];
371 sa->slots[offset].connection = NULL;
373 sa->slots[offset].inUse =
true;
374 return &
sa->slots[offset];
398 bool echo,
const char *initcmd)
409 sa->numslots = numslots;
410 sa->cparams = cparams;
413 sa->initcmd = initcmd;
435 sa->slots[offset].connection =
conn;
452 for (
i = 0;
i <
sa->numslots;
i++)
474 for (
i = 0;
i <
sa->numslots;
i++)
476 if (
sa->slots[
i].connection == NULL)
519 pg_log_error(
"processing of database \"%s\" failed: %s",
struct ParallelSlot ParallelSlot
#define offsetof(type, field)
volatile sig_atomic_t CancelRequested
void ResetCancelConn(void)
void SetCancelConn(PGconn *conn)
void disconnectDatabase(PGconn *conn)
static void PGresult * res
char * PQdb(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
int PQconsumeInput(PGconn *conn)
int PQisBusy(PGconn *conn)
char * PQresultErrorField(const PGresult *res, int fieldcode)
PGresult * PQgetResult(PGconn *conn)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
void * palloc0(Size size)
static bool wait_on_slots(ParallelSlotArray *sa)
ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
static int select_loop(int maxFd, fd_set *workerset)
bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
#define ERRCODE_UNDEFINED_TABLE
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static bool consumeQueryResult(ParallelSlot *slot)
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
void ParallelSlotsTerminate(ParallelSlotArray *sa)
static int find_unconnected_slot(const ParallelSlotArray *sa)
void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
static int find_any_idle_slot(const ParallelSlotArray *sa)
static void ParallelSlotClearHandler(ParallelSlot *slot)
static PGconn * connectDatabase(const char *dbname, const char *connstr, const char *pghost, const char *pgport, const char *pguser, trivalue prompt_password, bool fail_on_error)
static void executeCommand(PGconn *conn, const char *query)
ParallelSlotResultHandler handler
#define select(n, r, w, e, timeout)