PostgreSQL Source Code  git master
parallel_slot.h File Reference
#include "fe_utils/connect_utils.h"
#include "libpq-fe.h"
Include dependency graph for parallel_slot.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  ParallelSlot
 
struct  ParallelSlotArray
 

Typedefs

typedef bool(* ParallelSlotResultHandler) (PGresult *res, PGconn *conn, void *context)
 
typedef struct ParallelSlot ParallelSlot
 
typedef struct ParallelSlotArray ParallelSlotArray
 

Functions

static void ParallelSlotSetHandler (ParallelSlot *slot, ParallelSlotResultHandler handler, void *context)
 
static void ParallelSlotClearHandler (ParallelSlot *slot)
 
ParallelSlotParallelSlotsGetIdle (ParallelSlotArray *slots, const char *dbname)
 
ParallelSlotArrayParallelSlotsSetup (int numslots, ConnParams *cparams, const char *progname, bool echo, const char *initcmd)
 
void ParallelSlotsAdoptConn (ParallelSlotArray *sa, PGconn *conn)
 
void ParallelSlotsTerminate (ParallelSlotArray *sa)
 
bool ParallelSlotsWaitCompletion (ParallelSlotArray *sa)
 
bool TableCommandResultHandler (PGresult *res, PGconn *conn, void *context)
 

Typedef Documentation

◆ ParallelSlot

typedef struct ParallelSlot ParallelSlot

◆ ParallelSlotArray

◆ ParallelSlotResultHandler

typedef bool(* ParallelSlotResultHandler) (PGresult *res, PGconn *conn, void *context)

Definition at line 18 of file parallel_slot.h.

Function Documentation

◆ ParallelSlotClearHandler()

static void ParallelSlotClearHandler ( ParallelSlot slot)
inlinestatic

◆ ParallelSlotsAdoptConn()

void ParallelSlotsAdoptConn ( ParallelSlotArray sa,
PGconn conn 
)

Definition at line 432 of file parallel_slot.c.

References conn, ParallelSlot::connection, disconnectDatabase(), find_unconnected_slot(), and ParallelSlotArray::slots.

Referenced by main(), ParallelSlotClearHandler(), reindex_one_database(), and vacuum_one_database().

433 {
434  int offset;
435 
436  offset = find_unconnected_slot(sa);
437  if (offset >= 0)
438  sa->slots[offset].connection = conn;
439  else
440  disconnectDatabase(conn);
441 }
static int find_unconnected_slot(const ParallelSlotArray *sa)
void disconnectDatabase(PGconn *conn)
PGconn * conn
Definition: streamutil.c:54
PGconn * connection
Definition: parallel_slot.h:23
ParallelSlot slots[FLEXIBLE_ARRAY_MEMBER]
Definition: parallel_slot.h:43

◆ ParallelSlotSetHandler()

static void ParallelSlotSetHandler ( ParallelSlot slot,
ParallelSlotResultHandler  handler,
void *  context 
)
inlinestatic

Definition at line 47 of file parallel_slot.h.

References ParallelSlot::handler, and ParallelSlot::handler_context.

Referenced by main(), reindex_one_database(), and vacuum_one_database().

49 {
50  slot->handler = handler;
51  slot->handler_context = context;
52 }
ParallelSlotResultHandler handler
Definition: parallel_slot.h:32
void * handler_context
Definition: parallel_slot.h:33

◆ ParallelSlotsGetIdle()

ParallelSlot* ParallelSlotsGetIdle ( ParallelSlotArray slots,
const char *  dbname 
)

Definition at line 343 of file parallel_slot.c.

References Assert, connect_slot(), ParallelSlot::connection, disconnectDatabase(), find_any_idle_slot(), find_matching_idle_slot(), find_unconnected_slot(), ParallelSlot::inUse, ParallelSlotArray::numslots, ParallelSlotArray::slots, and wait_on_slots().

Referenced by main(), ParallelSlotClearHandler(), reindex_one_database(), and vacuum_one_database().

344 {
345  int offset;
346 
347  Assert(sa);
348  Assert(sa->numslots > 0);
349 
350  while (1)
351  {
352  /* First choice: a slot already connected to the desired database. */
353  offset = find_matching_idle_slot(sa, dbname);
354  if (offset >= 0)
355  {
356  sa->slots[offset].inUse = true;
357  return &sa->slots[offset];
358  }
359 
360  /* Second choice: a slot not connected to any database. */
361  offset = find_unconnected_slot(sa);
362  if (offset >= 0)
363  {
364  connect_slot(sa, offset, dbname);
365  sa->slots[offset].inUse = true;
366  return &sa->slots[offset];
367  }
368 
369  /* Third choice: a slot connected to the wrong database. */
370  offset = find_any_idle_slot(sa);
371  if (offset >= 0)
372  {
373  disconnectDatabase(sa->slots[offset].connection);
374  sa->slots[offset].connection = NULL;
375  connect_slot(sa, offset, dbname);
376  sa->slots[offset].inUse = true;
377  return &sa->slots[offset];
378  }
379 
380  /*
381  * Fourth choice: block until one or more slots become available. If
382  * any slots hit a fatal error, we'll find out about that here and
383  * return NULL.
384  */
385  if (!wait_on_slots(sa))
386  return NULL;
387  }
388 }
static void connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
static int find_unconnected_slot(const ParallelSlotArray *sa)
void disconnectDatabase(PGconn *conn)
static int find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
#define Assert(condition)
Definition: c.h:804
char * dbname
Definition: streamutil.c:51
static int find_any_idle_slot(const ParallelSlotArray *sa)
static bool wait_on_slots(ParallelSlotArray *sa)

◆ ParallelSlotsSetup()

ParallelSlotArray* ParallelSlotsSetup ( int  numslots,
ConnParams cparams,
const char *  progname,
bool  echo,
const char *  initcmd 
)

Definition at line 400 of file parallel_slot.c.

References Assert, ParallelSlotArray::cparams, ParallelSlotArray::echo, ParallelSlotArray::initcmd, ParallelSlotArray::numslots, offsetof, palloc0(), ParallelSlotArray::progname, and progname.

Referenced by main(), ParallelSlotClearHandler(), reindex_one_database(), and vacuum_one_database().

402 {
404 
405  Assert(numslots > 0);
406  Assert(cparams != NULL);
407  Assert(progname != NULL);
408 
410  numslots * sizeof(ParallelSlot));
411 
412  sa->numslots = numslots;
413  sa->cparams = cparams;
414  sa->progname = progname;
415  sa->echo = echo;
416  sa->initcmd = initcmd;
417 
418  return sa;
419 }
const char * progname
Definition: main.c:46
struct ParallelSlot ParallelSlot
Definition: parallel.h:52
const char * initcmd
Definition: parallel_slot.h:42
void * palloc0(Size size)
Definition: mcxt.c:1093
#define Assert(condition)
Definition: c.h:804
const char * progname
Definition: parallel_slot.h:40
ConnParams * cparams
Definition: parallel_slot.h:39
#define offsetof(type, field)
Definition: c.h:727

◆ ParallelSlotsTerminate()

void ParallelSlotsTerminate ( ParallelSlotArray sa)

Definition at line 451 of file parallel_slot.c.

References conn, ParallelSlot::connection, disconnectDatabase(), i, ParallelSlotArray::numslots, and ParallelSlotArray::slots.

Referenced by main(), ParallelSlotClearHandler(), reindex_one_database(), and vacuum_one_database().

452 {
453  int i;
454 
455  for (i = 0; i < sa->numslots; i++)
456  {
457  PGconn *conn = sa->slots[i].connection;
458 
459  if (conn == NULL)
460  continue;
461 
462  disconnectDatabase(conn);
463  }
464 }
void disconnectDatabase(PGconn *conn)
PGconn * conn
Definition: streamutil.c:54
PGconn * connection
Definition: parallel_slot.h:23
int i
ParallelSlot slots[FLEXIBLE_ARRAY_MEMBER]
Definition: parallel_slot.h:43

◆ ParallelSlotsWaitCompletion()

bool ParallelSlotsWaitCompletion ( ParallelSlotArray sa)

Definition at line 473 of file parallel_slot.c.

References ParallelSlot::connection, consumeQueryResult(), i, ParallelSlotArray::numslots, and ParallelSlotArray::slots.

Referenced by main(), ParallelSlotClearHandler(), reindex_one_database(), and vacuum_one_database().

474 {
475  int i;
476 
477  for (i = 0; i < sa->numslots; i++)
478  {
479  if (sa->slots[i].connection == NULL)
480  continue;
481  if (!consumeQueryResult(&sa->slots[i]))
482  return false;
483  }
484 
485  return true;
486 }
PGconn * connection
Definition: parallel_slot.h:23
static bool consumeQueryResult(ParallelSlot *slot)
Definition: parallel_slot.c:60
int i
ParallelSlot slots[FLEXIBLE_ARRAY_MEMBER]
Definition: parallel_slot.h:43

◆ TableCommandResultHandler()

bool TableCommandResultHandler ( PGresult res,
PGconn conn,
void *  context 
)

Definition at line 509 of file parallel_slot.c.

References Assert, ERRCODE_UNDEFINED_TABLE, PG_DIAG_SQLSTATE, pg_log_error, PGRES_COMMAND_OK, PQclear(), PQdb(), PQerrorMessage(), PQresultErrorField(), and PQresultStatus().

Referenced by ParallelSlotClearHandler(), reindex_one_database(), and vacuum_one_database().

510 {
511  Assert(res != NULL);
512  Assert(conn != NULL);
513 
514  /*
515  * If it's an error, report it. Errors about a missing table are harmless
516  * so we continue processing; but die for other errors.
517  */
518  if (PQresultStatus(res) != PGRES_COMMAND_OK)
519  {
520  char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
521 
522  pg_log_error("processing of database \"%s\" failed: %s",
523  PQdb(conn), PQerrorMessage(conn));
524 
525  if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
526  {
527  PQclear(res);
528  return false;
529  }
530  }
531 
532  return true;
533 }
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:6744
#define pg_log_error(...)
Definition: logging.h:80
#define PG_DIAG_SQLSTATE
Definition: postgres_ext.h:57
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
#define ERRCODE_UNDEFINED_TABLE
Definition: parallel_slot.c:30
void PQclear(PGresult *res)
Definition: fe-exec.c:694
char * PQdb(const PGconn *conn)
Definition: fe-connect.c:6590
char * PQresultErrorField(const PGresult *res, int fieldcode)
Definition: fe-exec.c:3233
#define Assert(condition)
Definition: c.h:804