15#if defined(WIN32) && FD_SETSIZE < 1024
16#error FD_SETSIZE needs to have been increased
28#define ERRCODE_UNDEFINED_TABLE "42P01"
96 struct timeval tv = {0, 1000000};
139 for (
i = 0;
i <
sa->numslots;
i++)
141 if (
sa->slots[
i].inUse)
144 if (
sa->slots[
i].connection ==
NULL)
163 for (
i = 0;
i <
sa->numslots;
i++)
165 if (
sa->slots[
i].inUse)
168 if (
sa->slots[
i].connection ==
NULL)
183 for (
i = 0;
i <
sa->numslots;
i++)
184 if (!
sa->slots[
i].inUse)
206 for (
i = 0;
i <
sa->numslots;
i++)
246 for (
i = 0;
i <
sa->numslots;
i++)
293 sa->cparams->override_dbname =
dbname;
325 pg_log_error(
"socket file descriptor out of range for select(): %d",
383 sa->slots[offset].inUse =
true;
384 return &
sa->slots[offset];
392 sa->slots[offset].inUse =
true;
393 return &
sa->slots[offset];
401 sa->slots[offset].connection =
NULL;
403 sa->slots[offset].inUse =
true;
404 return &
sa->slots[offset];
428 bool echo,
const char *initcmd)
439 sa->numslots = numslots;
440 sa->cparams = cparams;
443 sa->initcmd = initcmd;
465 sa->slots[offset].connection =
conn;
482 for (
i = 0;
i <
sa->numslots;
i++)
504 for (
i = 0;
i <
sa->numslots;
i++)
506 if (
sa->slots[
i].connection ==
NULL)
551 pg_log_error(
"processing of database \"%s\" failed: %s",
#define Assert(condition)
volatile sig_atomic_t CancelRequested
void ResetCancelConn(void)
void SetCancelConn(PGconn *conn)
void disconnectDatabase(PGconn *conn)
PGconn * connectDatabase(const ConnParams *cparams, const char *progname, bool echo, bool fail_ok, bool allow_password_reuse)
char * PQdb(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQconsumeInput(PGconn *conn)
int PQisBusy(PGconn *conn)
#define PQresultErrorField
#define pg_log_error(...)
#define pg_log_error_hint(...)
void * palloc0(Size size)
static bool wait_on_slots(ParallelSlotArray *sa)
ParallelSlotArray * ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
bool ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
static int select_loop(int maxFd, fd_set *workerset)
bool TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
#define ERRCODE_UNDEFINED_TABLE
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
static bool consumeQueryResult(ParallelSlot *slot)
static bool processQueryResult(ParallelSlot *slot, PGresult *result)
ParallelSlot * ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
void ParallelSlotsTerminate(ParallelSlotArray *sa)
static int find_unconnected_slot(const ParallelSlotArray *sa)
void ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
static int find_any_idle_slot(const ParallelSlotArray *sa)
static void ParallelSlotSetIdle(ParallelSlot *slot)
static void executeCommand(PGconn *conn, const char *query)
static int fd(const char *x, int i)
ParallelSlotResultHandler handler
#define select(n, r, w, e, timeout)