PostgreSQL Source Code  git master
parallel.c File Reference
#include "postgres_fe.h"
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include "fe_utils/string_utils.h"
#include "parallel.h"
#include "pg_backup_utils.h"
#include "port/pg_bswap.h"
Include dependency graph for parallel.c:

Go to the source code of this file.

Data Structures

struct  ParallelSlot
 
struct  ShutdownInformation
 
struct  DumpSignalInformation
 

Macros

#define PIPE_READ   0
 
#define PIPE_WRITE   1
 
#define NO_SLOT   (-1) /* Failure result for GetIdleWorker() */
 
#define WORKER_IS_RUNNING(workerStatus)   ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
 
#define pgpipe(a)   pipe(a)
 
#define piperead(a, b, c)   read(a,b,c)
 
#define pipewrite(a, b, c)   write(a,b,c)
 
#define write_stderr(str)
 
#define messageStartsWith(msg, prefix)   (strncmp(msg, prefix, strlen(prefix)) == 0)
 

Typedefs

typedef struct ShutdownInformation ShutdownInformation
 
typedef struct DumpSignalInformation DumpSignalInformation
 

Enumerations

enum  T_WorkerStatus { WRKR_NOT_STARTED = 0, WRKR_IDLE, WRKR_WORKING, WRKR_TERMINATED }
 

Functions

static ParallelSlotGetMyPSlot (ParallelState *pstate)
 
static void archive_close_connection (int code, void *arg)
 
static void ShutdownWorkersHard (ParallelState *pstate)
 
static void WaitForTerminatingWorkers (ParallelState *pstate)
 
static void setup_cancel_handler (void)
 
static void set_cancel_pstate (ParallelState *pstate)
 
static void set_cancel_slot_archive (ParallelSlot *slot, ArchiveHandle *AH)
 
static void RunWorker (ArchiveHandle *AH, ParallelSlot *slot)
 
static int GetIdleWorker (ParallelState *pstate)
 
static bool HasEveryWorkerTerminated (ParallelState *pstate)
 
static void lockTableForWorker (ArchiveHandle *AH, TocEntry *te)
 
static void WaitForCommands (ArchiveHandle *AH, int pipefd[2])
 
static bool ListenToWorkers (ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
 
static char * getMessageFromMaster (int pipefd[2])
 
static void sendMessageToMaster (int pipefd[2], const char *str)
 
static int select_loop (int maxFd, fd_set *workerset)
 
static char * getMessageFromWorker (ParallelState *pstate, bool do_wait, int *worker)
 
static void sendMessageToWorker (ParallelState *pstate, int worker, const char *str)
 
static char * readMessageFromPipe (int fd)
 
void init_parallel_dump_utils (void)
 
void on_exit_close_archive (Archive *AHX)
 
static void sigTermHandler (SIGNAL_ARGS)
 
void set_archive_cancel_info (ArchiveHandle *AH, PGconn *conn)
 
ParallelStateParallelBackupStart (ArchiveHandle *AH)
 
void ParallelBackupEnd (ArchiveHandle *AH, ParallelState *pstate)
 
static void buildWorkerCommand (ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
 
static void parseWorkerCommand (ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
 
static void buildWorkerResponse (ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
 
static int parseWorkerResponse (ArchiveHandle *AH, TocEntry *te, const char *msg)
 
void DispatchJobForTocEntry (ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
 
bool IsEveryWorkerIdle (ParallelState *pstate)
 
void WaitForWorkers (ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
 

Variables

static ShutdownInformation shutdown_info
 
static volatile DumpSignalInformation signal_info
 

Macro Definition Documentation

◆ messageStartsWith

#define messageStartsWith (   msg,
  prefix 
)    (strncmp(msg, prefix, strlen(prefix)) == 0)

Definition at line 228 of file parallel.c.

Referenced by ListenToWorkers(), parseWorkerCommand(), and parseWorkerResponse().

◆ NO_SLOT

#define NO_SLOT   (-1) /* Failure result for GetIdleWorker() */

Definition at line 74 of file parallel.c.

Referenced by DispatchJobForTocEntry(), GetIdleWorker(), and WaitForWorkers().

◆ pgpipe

#define pgpipe (   a)    pipe(a)

Definition at line 139 of file parallel.c.

Referenced by ParallelBackupStart(), and readMessageFromPipe().

◆ PIPE_READ

#define PIPE_READ   0

Definition at line 71 of file parallel.c.

Referenced by getMessageFromMaster(), ParallelBackupStart(), and RunWorker().

◆ PIPE_WRITE

#define PIPE_WRITE   1

Definition at line 72 of file parallel.c.

Referenced by ParallelBackupStart(), RunWorker(), and sendMessageToMaster().

◆ piperead

#define piperead (   a,
  b,
  c 
)    read(a,b,c)

Definition at line 140 of file parallel.c.

Referenced by readMessageFromPipe().

◆ pipewrite

#define pipewrite (   a,
  b,
  c 
)    write(a,b,c)

Definition at line 141 of file parallel.c.

Referenced by sendMessageToMaster(), and sendMessageToWorker().

◆ WORKER_IS_RUNNING

#define WORKER_IS_RUNNING (   workerStatus)    ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)

◆ write_stderr

Typedef Documentation

◆ DumpSignalInformation

◆ ShutdownInformation

Enumeration Type Documentation

◆ T_WorkerStatus

Enumerator
WRKR_NOT_STARTED 
WRKR_IDLE 
WRKR_WORKING 
WRKR_TERMINATED 

Definition at line 77 of file parallel.c.

Function Documentation

◆ archive_close_connection()

static void archive_close_connection ( int  code,
void *  arg 
)
static

Definition at line 358 of file parallel.c.

References ParallelSlot::AH, ShutdownInformation::AHX, closesocket, DisconnectDatabase(), GetMyPSlot(), ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ShutdownInformation::pstate, _archiveHandle::public, and ShutdownWorkersHard().

Referenced by on_exit_close_archive().

359 {
361 
362  if (si->pstate)
363  {
364  /* In parallel mode, must figure out who we are */
365  ParallelSlot *slot = GetMyPSlot(si->pstate);
366 
367  if (!slot)
368  {
369  /*
370  * We're the master. Forcibly shut down workers, then close our
371  * own database connection, if any.
372  */
374 
375  if (si->AHX)
376  DisconnectDatabase(si->AHX);
377  }
378  else
379  {
380  /*
381  * We're a worker. Shut down our own DB connection if any. On
382  * Windows, we also have to close our communication sockets, to
383  * emulate what will happen on Unix when the worker process exits.
384  * (Without this, if this is a premature exit, the master would
385  * fail to detect it because there would be no EOF condition on
386  * the other end of the pipe.)
387  */
388  if (slot->AH)
389  DisconnectDatabase(&(slot->AH->public));
390 
391 #ifdef WIN32
392  closesocket(slot->pipeRevRead);
393  closesocket(slot->pipeRevWrite);
394 #endif
395  }
396  }
397  else
398  {
399  /* Non-parallel operation: just kill the master DB connection */
400  if (si->AHX)
401  DisconnectDatabase(si->AHX);
402  }
403 }
ArchiveHandle * AH
Definition: parallel.c:103
#define closesocket
Definition: port.h:312
static void ShutdownWorkersHard(ParallelState *pstate)
Definition: parallel.c:414
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
Definition: parallel.c:283
int pipeRevRead
Definition: parallel.c:107
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:337
ParallelState * pstate
Definition: parallel.c:150
int pipeRevWrite
Definition: parallel.c:108
void * arg

◆ buildWorkerCommand()

static void buildWorkerCommand ( ArchiveHandle AH,
TocEntry te,
T_Action  act,
char *  buf,
int  buflen 
)
static

Definition at line 1127 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, Assert, _tocEntry::dumpId, and snprintf.

Referenced by DispatchJobForTocEntry().

1129 {
1130  if (act == ACT_DUMP)
1131  snprintf(buf, buflen, "DUMP %d", te->dumpId);
1132  else if (act == ACT_RESTORE)
1133  snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1134  else
1135  Assert(false);
1136 }
static char * buf
Definition: pg_test_fsync.c:67
#define Assert(condition)
Definition: c.h:738
#define snprintf
Definition: port.h:192

◆ buildWorkerResponse()

static void buildWorkerResponse ( ArchiveHandle AH,
TocEntry te,
T_Action  act,
int  status,
char *  buf,
int  buflen 
)
static

Definition at line 1175 of file parallel.c.

References _tocEntry::dumpId, Archive::n_errors, _archiveHandle::public, snprintf, and WORKER_IGNORED_ERRORS.

Referenced by WaitForCommands().

1177 {
1178  snprintf(buf, buflen, "OK %d %d %d",
1179  te->dumpId,
1180  status,
1182 }
int n_errors
Definition: pg_backup.h:206
#define WORKER_IGNORED_ERRORS
static char * buf
Definition: pg_test_fsync.c:67
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
#define snprintf
Definition: port.h:192

◆ DispatchJobForTocEntry()

void DispatchJobForTocEntry ( ArchiveHandle AH,
ParallelState pstate,
TocEntry te,
T_Action  act,
ParallelCompletionPtr  callback,
void *  callback_data 
)

Definition at line 1224 of file parallel.c.

References buf, buildWorkerCommand(), ParallelSlot::callback, ParallelSlot::callback_data, GetIdleWorker(), NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelState::te, WaitForWorkers(), WFW_ONE_IDLE, ParallelSlot::workerStatus, and WRKR_WORKING.

Referenced by restore_toc_entries_parallel(), and WriteDataChunks().

1230 {
1231  int worker;
1232  char buf[256];
1233 
1234  /* Get a worker, waiting if none are idle */
1235  while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1236  WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1237 
1238  /* Construct and send command string */
1239  buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1240 
1241  sendMessageToWorker(pstate, worker, buf);
1242 
1243  /* Remember worker is busy, and which TocEntry it's working on */
1244  pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1245  pstate->parallelSlot[worker].callback = callback;
1246  pstate->parallelSlot[worker].callback_data = callback_data;
1247  pstate->te[worker] = te;
1248 }
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
Definition: parallel.c:1127
void * callback_data
Definition: parallel.c:101
static int GetIdleWorker(ParallelState *pstate)
Definition: parallel.c:1255
#define NO_SLOT
Definition: parallel.c:74
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:48
static char * buf
Definition: pg_test_fsync.c:67
ParallelSlot * parallelSlot
Definition: parallel.h:45
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
Definition: parallel.c:1663
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1470
TocEntry ** te
Definition: parallel.h:44
ParallelCompletionPtr callback
Definition: parallel.c:100
T_WorkerStatus workerStatus
Definition: parallel.c:97

◆ GetIdleWorker()

static int GetIdleWorker ( ParallelState pstate)
static

Definition at line 1255 of file parallel.c.

References i, NO_SLOT, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by DispatchJobForTocEntry(), and WaitForWorkers().

1256 {
1257  int i;
1258 
1259  for (i = 0; i < pstate->numWorkers; i++)
1260  {
1261  if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1262  return i;
1263  }
1264  return NO_SLOT;
1265 }
#define NO_SLOT
Definition: parallel.c:74
ParallelSlot * parallelSlot
Definition: parallel.h:45
T_WorkerStatus workerStatus
Definition: parallel.c:97
int i
int numWorkers
Definition: parallel.h:42

◆ getMessageFromMaster()

static char * getMessageFromMaster ( int  pipefd[2])
static

Definition at line 1535 of file parallel.c.

References PIPE_READ, and readMessageFromPipe().

Referenced by WaitForCommands().

1536 {
1537  return readMessageFromPipe(pipefd[PIPE_READ]);
1538 }
#define PIPE_READ
Definition: parallel.c:71
static char * readMessageFromPipe(int fd)
Definition: parallel.c:1681

◆ getMessageFromWorker()

static char * getMessageFromWorker ( ParallelState pstate,
bool  do_wait,
int *  worker 
)
static

Definition at line 1598 of file parallel.c.

References Assert, fatal, i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, readMessageFromPipe(), select, select_loop(), WORKER_IS_RUNNING, and ParallelSlot::workerStatus.

Referenced by ListenToWorkers().

1599 {
1600  int i;
1601  fd_set workerset;
1602  int maxFd = -1;
1603  struct timeval nowait = {0, 0};
1604 
1605  /* construct bitmap of socket descriptors for select() */
1606  FD_ZERO(&workerset);
1607  for (i = 0; i < pstate->numWorkers; i++)
1608  {
1609  if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1610  continue;
1611  FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1612  if (pstate->parallelSlot[i].pipeRead > maxFd)
1613  maxFd = pstate->parallelSlot[i].pipeRead;
1614  }
1615 
1616  if (do_wait)
1617  {
1618  i = select_loop(maxFd, &workerset);
1619  Assert(i != 0);
1620  }
1621  else
1622  {
1623  if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1624  return NULL;
1625  }
1626 
1627  if (i < 0)
1628  fatal("select() failed: %m");
1629 
1630  for (i = 0; i < pstate->numWorkers; i++)
1631  {
1632  char *msg;
1633 
1634  if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1635  continue;
1636  if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1637  continue;
1638 
1639  /*
1640  * Read the message if any. If the socket is ready because of EOF,
1641  * we'll return NULL instead (and the socket will stay ready, so the
1642  * condition will persist).
1643  *
1644  * Note: because this is a blocking read, we'll wait if only part of
1645  * the message is available. Waiting a long time would be bad, but
1646  * since worker status messages are short and are always sent in one
1647  * operation, it shouldn't be a problem in practice.
1648  */
1649  msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1650  *worker = i;
1651  return msg;
1652  }
1653  Assert(false);
1654  return NULL;
1655 }
int pipeRead
Definition: parallel.c:105
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel.c:1559
static char * readMessageFromPipe(int fd)
Definition: parallel.c:1681
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
ParallelSlot * parallelSlot
Definition: parallel.h:45
#define Assert(condition)
Definition: c.h:738
#define fatal(...)
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
static bool do_wait
Definition: pg_ctl.c:79
int i
int numWorkers
Definition: parallel.h:42

◆ GetMyPSlot()

static ParallelSlot * GetMyPSlot ( ParallelState pstate)
static

Definition at line 283 of file parallel.c.

References createPQExpBuffer(), i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, and resetPQExpBuffer().

Referenced by archive_close_connection().

284 {
285  int i;
286 
287  for (i = 0; i < pstate->numWorkers; i++)
288  {
289 #ifdef WIN32
290  if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
291 #else
292  if (pstate->parallelSlot[i].pid == getpid())
293 #endif
294  return &(pstate->parallelSlot[i]);
295  }
296 
297  return NULL;
298 }
pid_t pid
Definition: parallel.c:115
ParallelSlot * parallelSlot
Definition: parallel.h:45
int i
int numWorkers
Definition: parallel.h:42

◆ HasEveryWorkerTerminated()

static bool HasEveryWorkerTerminated ( ParallelState pstate)
static

Definition at line 1271 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, WORKER_IS_RUNNING, and ParallelSlot::workerStatus.

Referenced by WaitForTerminatingWorkers().

1272 {
1273  int i;
1274 
1275  for (i = 0; i < pstate->numWorkers; i++)
1276  {
1278  return false;
1279  }
1280  return true;
1281 }
ParallelSlot * parallelSlot
Definition: parallel.h:45
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
int i
int numWorkers
Definition: parallel.h:42

◆ init_parallel_dump_utils()

void init_parallel_dump_utils ( void  )

Definition at line 251 of file parallel.c.

References exit_nicely, on_exit_nicely(), and pg_log_error.

Referenced by main().

252 {
253 #ifdef WIN32
254  if (!parallel_init_done)
255  {
256  WSADATA wsaData;
257  int err;
258 
259  /* Prepare for threaded operation */
260  tls_index = TlsAlloc();
261  mainThreadId = GetCurrentThreadId();
262 
263  /* Initialize socket access */
264  err = WSAStartup(MAKEWORD(2, 2), &wsaData);
265  if (err != 0)
266  {
267  pg_log_error("WSAStartup failed: %d", err);
268  exit_nicely(1);
269  }
270  /* ... and arrange to shut it down at exit */
271  on_exit_nicely(shutdown_parallel_dump_utils, NULL);
272  parallel_init_done = true;
273  }
274 #endif
275 }
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
#define pg_log_error(...)
Definition: logging.h:79
#define exit_nicely(code)
Definition: pg_dumpall.c:95

◆ IsEveryWorkerIdle()

bool IsEveryWorkerIdle ( ParallelState pstate)

Definition at line 1287 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by ParallelBackupEnd(), restore_toc_entries_parallel(), and WaitForWorkers().

1288 {
1289  int i;
1290 
1291  for (i = 0; i < pstate->numWorkers; i++)
1292  {
1293  if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1294  return false;
1295  }
1296  return true;
1297 }
ParallelSlot * parallelSlot
Definition: parallel.h:45
T_WorkerStatus workerStatus
Definition: parallel.c:97
int i
int numWorkers
Definition: parallel.h:42

◆ ListenToWorkers()

static bool ListenToWorkers ( ArchiveHandle AH,
ParallelState pstate,
bool  do_wait 
)
static

Definition at line 1417 of file parallel.c.

References ParallelSlot::callback, ParallelSlot::callback_data, fatal, free, getMessageFromWorker(), messageStartsWith, ParallelState::parallelSlot, parseWorkerResponse(), status(), ParallelState::te, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by WaitForWorkers().

1418 {
1419  int worker;
1420  char *msg;
1421 
1422  /* Try to collect a status message */
1423  msg = getMessageFromWorker(pstate, do_wait, &worker);
1424 
1425  if (!msg)
1426  {
1427  /* If do_wait is true, we must have detected EOF on some socket */
1428  if (do_wait)
1429  fatal("a worker process died unexpectedly");
1430  return false;
1431  }
1432 
1433  /* Process it and update our idea of the worker's status */
1434  if (messageStartsWith(msg, "OK "))
1435  {
1436  ParallelSlot *slot = &pstate->parallelSlot[worker];
1437  TocEntry *te = pstate->te[worker];
1438  int status;
1439 
1440  status = parseWorkerResponse(AH, te, msg);
1441  slot->callback(AH, te, status, slot->callback_data);
1442  slot->workerStatus = WRKR_IDLE;
1443  pstate->te[worker] = NULL;
1444  }
1445  else
1446  fatal("invalid message received from worker: \"%s\"",
1447  msg);
1448 
1449  /* Free the string returned from getMessageFromWorker */
1450  free(msg);
1451 
1452  return true;
1453 }
void * callback_data
Definition: parallel.c:101
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
Definition: parallel.c:1598
ParallelSlot * parallelSlot
Definition: parallel.h:45
#define free(a)
Definition: header.h:65
TocEntry ** te
Definition: parallel.h:44
ParallelCompletionPtr callback
Definition: parallel.c:100
#define fatal(...)
T_WorkerStatus workerStatus
Definition: parallel.c:97
static bool do_wait
Definition: pg_ctl.c:79
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
Definition: parallel.c:1190
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228

◆ lockTableForWorker()

static void lockTableForWorker ( ArchiveHandle AH,
TocEntry te 
)
static

Definition at line 1320 of file parallel.c.

References appendPQExpBuffer(), _archiveHandle::connection, createPQExpBuffer(), PQExpBufferData::data, _tocEntry::desc, destroyPQExpBuffer(), fatal, fmtQualifiedId(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), and _tocEntry::tag.

Referenced by WaitForCommands().

1321 {
1322  const char *qualId;
1323  PQExpBuffer query;
1324  PGresult *res;
1325 
1326  /* Nothing to do for BLOBS */
1327  if (strcmp(te->desc, "BLOBS") == 0)
1328  return;
1329 
1330  query = createPQExpBuffer();
1331 
1332  qualId = fmtQualifiedId(te->namespace, te->tag);
1333 
1334  appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1335  qualId);
1336 
1337  res = PQexec(AH->connection, query->data);
1338 
1339  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1340  fatal("could not obtain lock on relation \"%s\"\n"
1341  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1342  "on the table after the pg_dump parent process had gotten the "
1343  "initial ACCESS SHARE lock on the table.", qualId);
1344 
1345  PQclear(res);
1346  destroyPQExpBuffer(query);
1347 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:116
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:74
void PQclear(PGresult *res)
Definition: fe-exec.c:694
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
#define fatal(...)
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939

◆ on_exit_close_archive()

void on_exit_close_archive ( Archive AHX)

Definition at line 347 of file parallel.c.

References ShutdownInformation::AHX, archive_close_connection(), and on_exit_nicely().

Referenced by main().

348 {
349  shutdown_info.AHX = AHX;
351 }
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static ShutdownInformation shutdown_info
Definition: parallel.c:154
static void archive_close_connection(int code, void *arg)
Definition: parallel.c:358

◆ ParallelBackupEnd()

void ParallelBackupEnd ( ArchiveHandle AH,
ParallelState pstate 
)

Definition at line 1078 of file parallel.c.

References Assert, closesocket, free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, set_cancel_pstate(), ParallelState::te, and WaitForTerminatingWorkers().

Referenced by _CloseArchive(), and RestoreArchive().

1079 {
1080  int i;
1081 
1082  /* No work if non-parallel */
1083  if (pstate->numWorkers == 1)
1084  return;
1085 
1086  /* There should not be any unfinished jobs */
1087  Assert(IsEveryWorkerIdle(pstate));
1088 
1089  /* Close the sockets so that the workers know they can exit */
1090  for (i = 0; i < pstate->numWorkers; i++)
1091  {
1092  closesocket(pstate->parallelSlot[i].pipeRead);
1093  closesocket(pstate->parallelSlot[i].pipeWrite);
1094  }
1095 
1096  /* Wait for them to exit */
1097  WaitForTerminatingWorkers(pstate);
1098 
1099  /*
1100  * Unlink pstate from shutdown_info, so the exit handler will not try to
1101  * use it; and likewise unlink from signal_info.
1102  */
1103  shutdown_info.pstate = NULL;
1104  set_cancel_pstate(NULL);
1105 
1106  /* Release state (mere neatnik-ism, since we're about to terminate) */
1107  free(pstate->te);
1108  free(pstate->parallelSlot);
1109  free(pstate);
1110 }
int pipeRead
Definition: parallel.c:105
static void set_cancel_pstate(ParallelState *pstate)
Definition: parallel.c:808
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1287
#define closesocket
Definition: port.h:312
ParallelSlot * parallelSlot
Definition: parallel.h:45
ParallelState * pstate
Definition: parallel.c:150
#define free(a)
Definition: header.h:65
static ShutdownInformation shutdown_info
Definition: parallel.c:154
#define Assert(condition)
Definition: c.h:738
TocEntry ** te
Definition: parallel.h:44
int pipeWrite
Definition: parallel.c:106
int i
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition: parallel.c:465
int numWorkers
Definition: parallel.h:42

◆ ParallelBackupStart()

ParallelState* ParallelBackupStart ( ArchiveHandle AH)

Definition at line 916 of file parallel.c.

References ParallelSlot::AH, DumpSignalInformation::am_worker, Assert, closesocket, _archiveHandle::connection, fatal, getLocalPQExpBuffer, i, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_malloc(), pg_malloc0(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, pqsignal(), ShutdownInformation::pstate, _archiveHandle::public, RunWorker(), set_archive_cancel_info(), set_cancel_pstate(), SIG_IGN, SIGPIPE, ParallelState::te, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by _CloseArchive(), and RestoreArchive().

917 {
918  ParallelState *pstate;
919  int i;
920 
921  Assert(AH->public.numWorkers > 0);
922 
923  pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
924 
925  pstate->numWorkers = AH->public.numWorkers;
926  pstate->te = NULL;
927  pstate->parallelSlot = NULL;
928 
929  if (AH->public.numWorkers == 1)
930  return pstate;
931 
932  /* Create status arrays, being sure to initialize all fields to 0 */
933  pstate->te = (TocEntry **)
934  pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
935  pstate->parallelSlot = (ParallelSlot *)
936  pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
937 
938 #ifdef WIN32
939  /* Make fmtId() and fmtQualifiedId() use thread-local storage */
940  getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
941 #endif
942 
943  /*
944  * Set the pstate in shutdown_info, to tell the exit handler that it must
945  * clean up workers as well as the main database connection. But we don't
946  * set this in signal_info yet, because we don't want child processes to
947  * inherit non-NULL signal_info.pstate.
948  */
949  shutdown_info.pstate = pstate;
950 
951  /*
952  * Temporarily disable query cancellation on the master connection. This
953  * ensures that child processes won't inherit valid AH->connCancel
954  * settings and thus won't try to issue cancels against the master's
955  * connection. No harm is done if we fail while it's disabled, because
956  * the master connection is idle at this point anyway.
957  */
958  set_archive_cancel_info(AH, NULL);
959 
960  /* Ensure stdio state is quiesced before forking */
961  fflush(NULL);
962 
963  /* Create desired number of workers */
964  for (i = 0; i < pstate->numWorkers; i++)
965  {
966 #ifdef WIN32
967  WorkerInfo *wi;
968  uintptr_t handle;
969 #else
970  pid_t pid;
971 #endif
972  ParallelSlot *slot = &(pstate->parallelSlot[i]);
973  int pipeMW[2],
974  pipeWM[2];
975 
976  /* Create communication pipes for this worker */
977  if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
978  fatal("could not create communication channels: %m");
979 
980  /* master's ends of the pipes */
981  slot->pipeRead = pipeWM[PIPE_READ];
982  slot->pipeWrite = pipeMW[PIPE_WRITE];
983  /* child's ends of the pipes */
984  slot->pipeRevRead = pipeMW[PIPE_READ];
985  slot->pipeRevWrite = pipeWM[PIPE_WRITE];
986 
987 #ifdef WIN32
988  /* Create transient structure to pass args to worker function */
989  wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
990 
991  wi->AH = AH;
992  wi->slot = slot;
993 
994  handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
995  wi, 0, &(slot->threadId));
996  slot->hThread = handle;
997  slot->workerStatus = WRKR_IDLE;
998 #else /* !WIN32 */
999  pid = fork();
1000  if (pid == 0)
1001  {
1002  /* we are the worker */
1003  int j;
1004 
1005  /* this is needed for GetMyPSlot() */
1006  slot->pid = getpid();
1007 
1008  /* instruct signal handler that we're in a worker now */
1009  signal_info.am_worker = true;
1010 
1011  /* close read end of Worker -> Master */
1012  closesocket(pipeWM[PIPE_READ]);
1013  /* close write end of Master -> Worker */
1014  closesocket(pipeMW[PIPE_WRITE]);
1015 
1016  /*
1017  * Close all inherited fds for communication of the master with
1018  * previously-forked workers.
1019  */
1020  for (j = 0; j < i; j++)
1021  {
1022  closesocket(pstate->parallelSlot[j].pipeRead);
1023  closesocket(pstate->parallelSlot[j].pipeWrite);
1024  }
1025 
1026  /* Run the worker ... */
1027  RunWorker(AH, slot);
1028 
1029  /* We can just exit(0) when done */
1030  exit(0);
1031  }
1032  else if (pid < 0)
1033  {
1034  /* fork failed */
1035  fatal("could not create worker process: %m");
1036  }
1037 
1038  /* In Master after successful fork */
1039  slot->pid = pid;
1040  slot->workerStatus = WRKR_IDLE;
1041 
1042  /* close read end of Master -> Worker */
1043  closesocket(pipeMW[PIPE_READ]);
1044  /* close write end of Worker -> Master */
1045  closesocket(pipeWM[PIPE_WRITE]);
1046 #endif /* WIN32 */
1047  }
1048 
1049  /*
1050  * Having forked off the workers, disable SIGPIPE so that master isn't
1051  * killed if it tries to send a command to a dead worker. We don't want
1052  * the workers to inherit this setting, though.
1053  */
1054 #ifndef WIN32
1056 #endif
1057 
1058  /*
1059  * Re-establish query cancellation on the master connection.
1060  */
1062 
1063  /*
1064  * Tell the cancel signal handler to forward signals to worker processes,
1065  * too. (As with query cancel, we did not need this earlier because the
1066  * workers have not yet been given anything to do; if we die before this
1067  * point, any already-started workers will see EOF and quit promptly.)
1068  */
1069  set_cancel_pstate(pstate);
1070 
1071  return pstate;
1072 }
int pipeRead
Definition: parallel.c:105
static void set_cancel_pstate(ParallelState *pstate)
Definition: parallel.c:808
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define closesocket
Definition: port.h:312
#define PIPE_READ
Definition: parallel.c:71
#define SIGPIPE
Definition: win32_port.h:158
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
pid_t pid
Definition: parallel.c:115
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
Definition: parallel.c:848
int pipeRevRead
Definition: parallel.c:107
#define pgpipe(a)
Definition: parallel.c:139
#define SIG_IGN
Definition: win32_port.h:150
ParallelSlot * parallelSlot
Definition: parallel.h:45
ParallelState * pstate
Definition: parallel.c:150
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static ShutdownInformation shutdown_info
Definition: parallel.c:154
#define Assert(condition)
Definition: c.h:738
TocEntry ** te
Definition: parallel.h:44
int pipeRevWrite
Definition: parallel.c:108
PQExpBuffer(* getLocalPQExpBuffer)(void)
Definition: string_utils.c:27
#define fatal(...)
int numWorkers
Definition: pg_backup.h:193
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
T_WorkerStatus workerStatus
Definition: parallel.c:97
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
Definition: parallel.c:749
int pipeWrite
Definition: parallel.c:106
int i
#define PIPE_WRITE
Definition: parallel.c:72
int numWorkers
Definition: parallel.h:42

◆ parseWorkerCommand()

static void parseWorkerCommand ( ArchiveHandle AH,
TocEntry **  te,
T_Action act,
const char *  msg 
)
static

Definition at line 1142 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, Assert, fatal, getTocEntryByDumpId(), and messageStartsWith.

Referenced by WaitForCommands().

1144 {
1145  DumpId dumpId;
1146  int nBytes;
1147 
1148  if (messageStartsWith(msg, "DUMP "))
1149  {
1150  *act = ACT_DUMP;
1151  sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1152  Assert(nBytes == strlen(msg));
1153  *te = getTocEntryByDumpId(AH, dumpId);
1154  Assert(*te != NULL);
1155  }
1156  else if (messageStartsWith(msg, "RESTORE "))
1157  {
1158  *act = ACT_RESTORE;
1159  sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1160  Assert(nBytes == strlen(msg));
1161  *te = getTocEntryByDumpId(AH, dumpId);
1162  Assert(*te != NULL);
1163  }
1164  else
1165  fatal("unrecognized command received from master: \"%s\"",
1166  msg);
1167 }
int DumpId
Definition: pg_backup.h:234
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
#define Assert(condition)
Definition: c.h:738
#define fatal(...)
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228

◆ parseWorkerResponse()

static int parseWorkerResponse ( ArchiveHandle AH,
TocEntry te,
const char *  msg 
)
static

Definition at line 1190 of file parallel.c.

References Assert, _tocEntry::dumpId, fatal, messageStartsWith, Archive::n_errors, _archiveHandle::public, and status().

Referenced by ListenToWorkers().

1192 {
1193  DumpId dumpId;
1194  int nBytes,
1195  n_errors;
1196  int status = 0;
1197 
1198  if (messageStartsWith(msg, "OK "))
1199  {
1200  sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1201 
1202  Assert(dumpId == te->dumpId);
1203  Assert(nBytes == strlen(msg));
1204 
1205  AH->public.n_errors += n_errors;
1206  }
1207  else
1208  fatal("invalid message received from worker: \"%s\"",
1209  msg);
1210 
1211  return status;
1212 }
int DumpId
Definition: pg_backup.h:234
int n_errors
Definition: pg_backup.h:206
#define Assert(condition)
Definition: c.h:738
#define fatal(...)
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228

◆ readMessageFromPipe()

static char * readMessageFromPipe ( int  fd)
static

Definition at line 1681 of file parallel.c.

References accept, Assert, bind, buf, closesocket, connect, listen, pg_free(), pg_hton16, pg_hton32, pg_log_error, pg_malloc(), pg_realloc(), PGINVALID_SOCKET, pgpipe, piperead, recv, and socket.

Referenced by getMessageFromMaster(), and getMessageFromWorker().

1682 {
1683  char *msg;
1684  int msgsize,
1685  bufsize;
1686  int ret;
1687 
1688  /*
1689  * In theory, if we let piperead() read multiple bytes, it might give us
1690  * back fragments of multiple messages. (That can't actually occur, since
1691  * neither master nor workers send more than one message without waiting
1692  * for a reply, but we don't wish to assume that here.) For simplicity,
1693  * read a byte at a time until we get the terminating '\0'. This method
1694  * is a bit inefficient, but since this is only used for relatively short
1695  * command and status strings, it shouldn't matter.
1696  */
1697  bufsize = 64; /* could be any number */
1698  msg = (char *) pg_malloc(bufsize);
1699  msgsize = 0;
1700  for (;;)
1701  {
1702  Assert(msgsize < bufsize);
1703  ret = piperead(fd, msg + msgsize, 1);
1704  if (ret <= 0)
1705  break; /* error or connection closure */
1706 
1707  Assert(ret == 1);
1708 
1709  if (msg[msgsize] == '\0')
1710  return msg; /* collected whole message */
1711 
1712  msgsize++;
1713  if (msgsize == bufsize) /* enlarge buffer if needed */
1714  {
1715  bufsize += 16; /* could be any number */
1716  msg = (char *) pg_realloc(msg, bufsize);
1717  }
1718  }
1719 
1720  /* Other end has closed the connection */
1721  pg_free(msg);
1722  return NULL;
1723 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
#define Assert(condition)
Definition: c.h:738
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define piperead(a, b, c)
Definition: parallel.c:140

◆ RunWorker()

static void RunWorker ( ArchiveHandle AH,
ParallelSlot slot 
)
static

Definition at line 848 of file parallel.c.

References ParallelSlot::AH, CloneArchive(), DeCloneArchive(), DisconnectDatabase(), free, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, _archiveHandle::public, set_cancel_slot_archive(), _archiveHandle::SetupWorkerPtr, and WaitForCommands().

Referenced by ParallelBackupStart().

849 {
850  int pipefd[2];
851 
852  /* fetch child ends of pipes */
853  pipefd[PIPE_READ] = slot->pipeRevRead;
854  pipefd[PIPE_WRITE] = slot->pipeRevWrite;
855 
856  /*
857  * Clone the archive so that we have our own state to work with, and in
858  * particular our own database connection.
859  *
860  * We clone on Unix as well as Windows, even though technically we don't
861  * need to because fork() gives us a copy in our own address space
862  * already. But CloneArchive resets the state information and also clones
863  * the database connection which both seem kinda helpful.
864  */
865  AH = CloneArchive(AH);
866 
867  /* Remember cloned archive where signal handler can find it */
868  set_cancel_slot_archive(slot, AH);
869 
870  /*
871  * Call the setup worker function that's defined in the ArchiveHandle.
872  */
873  (AH->SetupWorkerPtr) ((Archive *) AH);
874 
875  /*
876  * Execute commands until done.
877  */
878  WaitForCommands(AH, pipefd);
879 
880  /*
881  * Disconnect from database and clean up.
882  */
883  set_cancel_slot_archive(slot, NULL);
884  DisconnectDatabase(&(AH->public));
885  DeCloneArchive(AH);
886 }
#define PIPE_READ
Definition: parallel.c:71
SetupWorkerPtrType SetupWorkerPtr
void DeCloneArchive(ArchiveHandle *AH)
int pipeRevRead
Definition: parallel.c:107
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:337
int pipeRevWrite
Definition: parallel.c:108
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
Definition: parallel.c:828
#define PIPE_WRITE
Definition: parallel.c:72
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
Definition: parallel.c:1355

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset 
)
static

Definition at line 1559 of file parallel.c.

References EINTR, i, and select.

Referenced by getMessageFromWorker().

1560 {
1561  int i;
1562  fd_set saveSet = *workerset;
1563 
1564  for (;;)
1565  {
1566  *workerset = saveSet;
1567  i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1568 
1569 #ifndef WIN32
1570  if (i < 0 && errno == EINTR)
1571  continue;
1572 #else
1573  if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1574  continue;
1575 #endif
1576  break;
1577  }
1578 
1579  return i;
1580 }
#define select(n, r, w, e, timeout)
Definition: win32_port.h:436
int i
#define EINTR
Definition: win32_port.h:323

◆ sendMessageToMaster()

static void sendMessageToMaster ( int  pipefd[2],
const char *  str 
)
static

Definition at line 1546 of file parallel.c.

References fatal, PIPE_WRITE, and pipewrite.

Referenced by WaitForCommands().

1547 {
1548  int len = strlen(str) + 1;
1549 
1550  if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1551  fatal("could not write to the communication channel: %m");
1552 }
#define fatal(...)
#define pipewrite(a, b, c)
Definition: parallel.c:141
#define PIPE_WRITE
Definition: parallel.c:72

◆ sendMessageToWorker()

static void sendMessageToWorker ( ParallelState pstate,
int  worker,
const char *  str 
)
static

Definition at line 1663 of file parallel.c.

References fatal, ParallelState::parallelSlot, ParallelSlot::pipeWrite, and pipewrite.

Referenced by DispatchJobForTocEntry().

1664 {
1665  int len = strlen(str) + 1;
1666 
1667  if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1668  {
1669  fatal("could not write to the communication channel: %m");
1670  }
1671 }
ParallelSlot * parallelSlot
Definition: parallel.h:45
#define fatal(...)
int pipeWrite
Definition: parallel.c:106
#define pipewrite(a, b, c)
Definition: parallel.c:141

◆ set_archive_cancel_info()

void set_archive_cancel_info ( ArchiveHandle AH,
PGconn conn 
)

Definition at line 749 of file parallel.c.

References ParallelSlot::AH, _archiveHandle::connCancel, DumpSignalInformation::myAH, PQfreeCancel(), PQgetCancel(), and setup_cancel_handler().

Referenced by ConnectDatabase(), DisconnectDatabase(), ParallelBackupStart(), and ReconnectToServer().

750 {
751  PGcancel *oldConnCancel;
752 
753  /*
754  * Activate the interrupt handler if we didn't yet in this process. On
755  * Windows, this also initializes signal_info_lock; therefore it's
756  * important that this happen at least once before we fork off any
757  * threads.
758  */
760 
761  /*
762  * On Unix, we assume that storing a pointer value is atomic with respect
763  * to any possible signal interrupt. On Windows, use a critical section.
764  */
765 
766 #ifdef WIN32
767  EnterCriticalSection(&signal_info_lock);
768 #endif
769 
770  /* Free the old one if we have one */
771  oldConnCancel = AH->connCancel;
772  /* be sure interrupt handler doesn't use pointer while freeing */
773  AH->connCancel = NULL;
774 
775  if (oldConnCancel != NULL)
776  PQfreeCancel(oldConnCancel);
777 
778  /* Set the new one if specified */
779  if (conn)
780  AH->connCancel = PQgetCancel(conn);
781 
782  /*
783  * On Unix, there's only ever one active ArchiveHandle per process, so we
784  * can just set signal_info.myAH unconditionally. On Windows, do that
785  * only in the main thread; worker threads have to make sure their
786  * ArchiveHandle appears in the pstate data, which is dealt with in
787  * RunWorker().
788  */
789 #ifndef WIN32
790  signal_info.myAH = AH;
791 #else
792  if (mainThreadId == GetCurrentThreadId())
793  signal_info.myAH = AH;
794 #endif
795 
796 #ifdef WIN32
797  LeaveCriticalSection(&signal_info_lock);
798 #endif
799 }
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4326
ArchiveHandle * myAH
Definition: parallel.c:167
static void setup_cancel_handler(void)
Definition: parallel.c:627
PGcancel *volatile connCancel
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4303
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175

◆ set_cancel_pstate()

static void set_cancel_pstate ( ParallelState pstate)
static

Definition at line 808 of file parallel.c.

References DumpSignalInformation::pstate.

Referenced by ParallelBackupEnd(), and ParallelBackupStart().

809 {
810 #ifdef WIN32
811  EnterCriticalSection(&signal_info_lock);
812 #endif
813 
814  signal_info.pstate = pstate;
815 
816 #ifdef WIN32
817  LeaveCriticalSection(&signal_info_lock);
818 #endif
819 }
ParallelState * pstate
Definition: parallel.c:168
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175

◆ set_cancel_slot_archive()

static void set_cancel_slot_archive ( ParallelSlot slot,
ArchiveHandle AH 
)
static

Definition at line 828 of file parallel.c.

References ParallelSlot::AH.

Referenced by RunWorker().

829 {
830 #ifdef WIN32
831  EnterCriticalSection(&signal_info_lock);
832 #endif
833 
834  slot->AH = AH;
835 
836 #ifdef WIN32
837  LeaveCriticalSection(&signal_info_lock);
838 #endif
839 }
ArchiveHandle * AH
Definition: parallel.c:103

◆ setup_cancel_handler()

static void setup_cancel_handler ( void  )
static

Definition at line 627 of file parallel.c.

References ParallelSlot::AH, _archiveHandle::connCancel, DumpSignalInformation::handler_set, i, DumpSignalInformation::myAH, ParallelState::numWorkers, ParallelState::parallelSlot, PQcancel(), pqsignal(), progname, DumpSignalInformation::pstate, SIGQUIT, sigTermHandler(), and write_stderr.

Referenced by main(), psql_setup_cancel_handler(), runInitSteps(), and set_archive_cancel_info().

628 {
629  /*
630  * When forking, signal_info.handler_set will propagate into the new
631  * process, but that's fine because the signal handler state does too.
632  */
634  {
635  signal_info.handler_set = true;
636 
637  pqsignal(SIGINT, sigTermHandler);
638  pqsignal(SIGTERM, sigTermHandler);
640  }
641 }
#define SIGQUIT
Definition: win32_port.h:154
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
static void sigTermHandler(SIGNAL_ARGS)
Definition: parallel.c:564

◆ ShutdownWorkersHard()

static void ShutdownWorkersHard ( ParallelState pstate)
static

Definition at line 414 of file parallel.c.

References ParallelSlot::AH, closesocket, _archiveHandle::connCancel, i, kill, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, ParallelSlot::pipeWrite, PQcancel(), and WaitForTerminatingWorkers().

Referenced by archive_close_connection().

415 {
416  int i;
417 
418  /*
419  * Close our write end of the sockets so that any workers waiting for
420  * commands know they can exit. (Note: some of the pipeWrite fields might
421  * still be zero, if we failed to initialize all the workers. Hence, just
422  * ignore errors here.)
423  */
424  for (i = 0; i < pstate->numWorkers; i++)
425  closesocket(pstate->parallelSlot[i].pipeWrite);
426 
427  /*
428  * Force early termination of any commands currently in progress.
429  */
430 #ifndef WIN32
431  /* On non-Windows, send SIGTERM to each worker process. */
432  for (i = 0; i < pstate->numWorkers; i++)
433  {
434  pid_t pid = pstate->parallelSlot[i].pid;
435 
436  if (pid != 0)
437  kill(pid, SIGTERM);
438  }
439 #else
440 
441  /*
442  * On Windows, send query cancels directly to the workers' backends. Use
443  * a critical section to ensure worker threads don't change state.
444  */
445  EnterCriticalSection(&signal_info_lock);
446  for (i = 0; i < pstate->numWorkers; i++)
447  {
448  ArchiveHandle *AH = pstate->parallelSlot[i].AH;
449  char errbuf[1];
450 
451  if (AH != NULL && AH->connCancel != NULL)
452  (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
453  }
454  LeaveCriticalSection(&signal_info_lock);
455 #endif
456 
457  /* Now wait for them to terminate. */
459 }
ArchiveHandle * AH
Definition: parallel.c:103
#define closesocket
Definition: port.h:312
#define kill(pid, sig)
Definition: win32_port.h:426
PGcancel *volatile connCancel
pid_t pid
Definition: parallel.c:115
ParallelSlot * parallelSlot
Definition: parallel.h:45
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4458
int pipeWrite
Definition: parallel.c:106
int i
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition: parallel.c:465
int numWorkers
Definition: parallel.h:42

◆ sigTermHandler()

static void sigTermHandler ( SIGNAL_ARGS  )
static

Definition at line 564 of file parallel.c.

References DumpSignalInformation::am_worker, _archiveHandle::connCancel, i, kill, DumpSignalInformation::myAH, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, PQcancel(), pqsignal(), progname, DumpSignalInformation::pstate, SIG_IGN, SIGQUIT, and write_stderr.

Referenced by setup_cancel_handler().

565 {
566  int i;
567  char errbuf[1];
568 
569  /*
570  * Some platforms allow delivery of new signals to interrupt an active
571  * signal handler. That could muck up our attempt to send PQcancel, so
572  * disable the signals that setup_cancel_handler enabled.
573  */
574  pqsignal(SIGINT, SIG_IGN);
575  pqsignal(SIGTERM, SIG_IGN);
577 
578  /*
579  * If we're in the master, forward signal to all workers. (It seems best
580  * to do this before PQcancel; killing the master transaction will result
581  * in invalid-snapshot errors from active workers, which maybe we can
582  * quiet by killing workers first.) Ignore any errors.
583  */
584  if (signal_info.pstate != NULL)
585  {
586  for (i = 0; i < signal_info.pstate->numWorkers; i++)
587  {
588  pid_t pid = signal_info.pstate->parallelSlot[i].pid;
589 
590  if (pid != 0)
591  kill(pid, SIGTERM);
592  }
593  }
594 
595  /*
596  * Send QueryCancel if we have a connection to send to. Ignore errors,
597  * there's not much we can do about them anyway.
598  */
599  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
600  (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
601 
602  /*
603  * Report we're quitting, using nothing more complicated than write(2).
604  * When in parallel operation, only the master process should do this.
605  */
606  if (!signal_info.am_worker)
607  {
608  if (progname)
609  {
611  write_stderr(": ");
612  }
613  write_stderr("terminated by user\n");
614  }
615 
616  /*
617  * And die, using _exit() not exit() because the latter will invoke atexit
618  * handlers that can fail if we interrupted related code.
619  */
620  _exit(1);
621 }
#define SIGQUIT
Definition: win32_port.h:154
ArchiveHandle * myAH
Definition: parallel.c:167
#define write_stderr(str)
Definition: parallel.c:186
#define kill(pid, sig)
Definition: win32_port.h:426
PGcancel *volatile connCancel
const char * progname
Definition: pg_standby.c:36
ParallelState * pstate
Definition: parallel.c:168
pid_t pid
Definition: parallel.c:115
#define SIG_IGN
Definition: win32_port.h:150
ParallelSlot * parallelSlot
Definition: parallel.h:45
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4458
int i
int numWorkers
Definition: parallel.h:42

◆ WaitForCommands()

static void WaitForCommands ( ArchiveHandle AH,
int  pipefd[2] 
)
static

Definition at line 1355 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, Assert, buf, buildWorkerResponse(), free, getMessageFromMaster(), lockTableForWorker(), parseWorkerCommand(), sendMessageToMaster(), status(), _archiveHandle::WorkerJobDumpPtr, and _archiveHandle::WorkerJobRestorePtr.

Referenced by RunWorker().

1356 {
1357  char *command;
1358  TocEntry *te;
1359  T_Action act;
1360  int status = 0;
1361  char buf[256];
1362 
1363  for (;;)
1364  {
1365  if (!(command = getMessageFromMaster(pipefd)))
1366  {
1367  /* EOF, so done */
1368  return;
1369  }
1370 
1371  /* Decode the command */
1372  parseWorkerCommand(AH, &te, &act, command);
1373 
1374  if (act == ACT_DUMP)
1375  {
1376  /* Acquire lock on this table within the worker's session */
1377  lockTableForWorker(AH, te);
1378 
1379  /* Perform the dump command */
1380  status = (AH->WorkerJobDumpPtr) (AH, te);
1381  }
1382  else if (act == ACT_RESTORE)
1383  {
1384  /* Perform the restore command */
1385  status = (AH->WorkerJobRestorePtr) (AH, te);
1386  }
1387  else
1388  Assert(false);
1389 
1390  /* Return status to master */
1391  buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1392 
1393  sendMessageToMaster(pipefd, buf);
1394 
1395  /* command was pg_malloc'd and we are responsible for free()ing it. */
1396  free(command);
1397  }
1398 }
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
Definition: parallel.c:1175
static char * getMessageFromMaster(int pipefd[2])
Definition: parallel.c:1535
static char * buf
Definition: pg_test_fsync.c:67
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
Definition: parallel.c:1320
#define free(a)
Definition: header.h:65
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
Definition: parallel.c:1142
#define Assert(condition)
Definition: c.h:738
WorkerJobRestorePtrType WorkerJobRestorePtr
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
WorkerJobDumpPtrType WorkerJobDumpPtr
static void sendMessageToMaster(int pipefd[2], const char *str)
Definition: parallel.c:1546

◆ WaitForTerminatingWorkers()

static void WaitForTerminatingWorkers ( ParallelState pstate)
static

Definition at line 465 of file parallel.c.

References Assert, free, HasEveryWorkerTerminated(), ParallelState::numWorkers, ParallelState::parallelSlot, pg_malloc(), ParallelSlot::pid, status(), ParallelState::te, WORKER_IS_RUNNING, ParallelSlot::workerStatus, and WRKR_TERMINATED.

Referenced by ParallelBackupEnd(), and ShutdownWorkersHard().

466 {
467  while (!HasEveryWorkerTerminated(pstate))
468  {
469  ParallelSlot *slot = NULL;
470  int j;
471 
472 #ifndef WIN32
473  /* On non-Windows, use wait() to wait for next worker to end */
474  int status;
475  pid_t pid = wait(&status);
476 
477  /* Find dead worker's slot, and clear the PID field */
478  for (j = 0; j < pstate->numWorkers; j++)
479  {
480  slot = &(pstate->parallelSlot[j]);
481  if (slot->pid == pid)
482  {
483  slot->pid = 0;
484  break;
485  }
486  }
487 #else /* WIN32 */
488  /* On Windows, we must use WaitForMultipleObjects() */
489  HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
490  int nrun = 0;
491  DWORD ret;
492  uintptr_t hThread;
493 
494  for (j = 0; j < pstate->numWorkers; j++)
495  {
497  {
498  lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
499  nrun++;
500  }
501  }
502  ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
503  Assert(ret != WAIT_FAILED);
504  hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
505  free(lpHandles);
506 
507  /* Find dead worker's slot, and clear the hThread field */
508  for (j = 0; j < pstate->numWorkers; j++)
509  {
510  slot = &(pstate->parallelSlot[j]);
511  if (slot->hThread == hThread)
512  {
513  /* For cleanliness, close handles for dead threads */
514  CloseHandle((HANDLE) slot->hThread);
515  slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
516  break;
517  }
518  }
519 #endif /* WIN32 */
520 
521  /* On all platforms, update workerStatus and te[] as well */
522  Assert(j < pstate->numWorkers);
524  pstate->te[j] = NULL;
525  }
526 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static bool HasEveryWorkerTerminated(ParallelState *pstate)
Definition: parallel.c:1271
pid_t pid
Definition: parallel.c:115
ParallelSlot * parallelSlot
Definition: parallel.h:45
#define free(a)
Definition: header.h:65
#define Assert(condition)
Definition: c.h:738
TocEntry ** te
Definition: parallel.h:44
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:225
int numWorkers
Definition: parallel.h:42

◆ WaitForWorkers()

void WaitForWorkers ( ArchiveHandle AH,
ParallelState pstate,
WFW_WaitOption  mode 
)

Definition at line 1470 of file parallel.c.

References Assert, do_wait, GetIdleWorker(), IsEveryWorkerIdle(), ListenToWorkers(), NO_SLOT, WFW_ALL_IDLE, WFW_GOT_STATUS, WFW_NO_WAIT, and WFW_ONE_IDLE.

Referenced by DispatchJobForTocEntry(), restore_toc_entries_parallel(), and WriteDataChunks().

1471 {
1472  bool do_wait = false;
1473 
1474  /*
1475  * In GOT_STATUS mode, always block waiting for a message, since we can't
1476  * return till we get something. In other modes, we don't block the first
1477  * time through the loop.
1478  */
1479  if (mode == WFW_GOT_STATUS)
1480  {
1481  /* Assert that caller knows what it's doing */
1482  Assert(!IsEveryWorkerIdle(pstate));
1483  do_wait = true;
1484  }
1485 
1486  for (;;)
1487  {
1488  /*
1489  * Check for status messages, even if we don't need to block. We do
1490  * not try very hard to reap all available messages, though, since
1491  * there's unlikely to be more than one.
1492  */
1493  if (ListenToWorkers(AH, pstate, do_wait))
1494  {
1495  /*
1496  * If we got a message, we are done by definition for GOT_STATUS
1497  * mode, and we can also be certain that there's at least one idle
1498  * worker. So we're done in all but ALL_IDLE mode.
1499  */
1500  if (mode != WFW_ALL_IDLE)
1501  return;
1502  }
1503 
1504  /* Check whether we must wait for new status messages */
1505  switch (mode)
1506  {
1507  case WFW_NO_WAIT:
1508  return; /* never wait */
1509  case WFW_GOT_STATUS:
1510  Assert(false); /* can't get here, because we waited */
1511  break;
1512  case WFW_ONE_IDLE:
1513  if (GetIdleWorker(pstate) != NO_SLOT)
1514  return;
1515  break;
1516  case WFW_ALL_IDLE:
1517  if (IsEveryWorkerIdle(pstate))
1518  return;
1519  break;
1520  }
1521 
1522  /* Loop back, and this time wait for something to happen */
1523  do_wait = true;
1524  }
1525 }
static PgChecksumMode mode
Definition: pg_checksums.c:61
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1287
static int GetIdleWorker(ParallelState *pstate)
Definition: parallel.c:1255
#define NO_SLOT
Definition: parallel.c:74
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
Definition: parallel.c:1417
#define Assert(condition)
Definition: c.h:738
static bool do_wait
Definition: pg_ctl.c:79

Variable Documentation

◆ shutdown_info

ShutdownInformation shutdown_info
static

Definition at line 154 of file parallel.c.

◆ signal_info

volatile DumpSignalInformation signal_info
static

Definition at line 175 of file parallel.c.