PostgreSQL Source Code  git master
parallel.c File Reference
#include "postgres_fe.h"
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include "fe_utils/string_utils.h"
#include "parallel.h"
#include "pg_backup_utils.h"
#include "port/pg_bswap.h"
Include dependency graph for parallel.c:

Go to the source code of this file.

Data Structures

struct  ParallelSlot
 
struct  ShutdownInformation
 
struct  DumpSignalInformation
 

Macros

#define PIPE_READ   0
 
#define PIPE_WRITE   1
 
#define NO_SLOT   (-1) /* Failure result for GetIdleWorker() */
 
#define WORKER_IS_RUNNING(workerStatus)   ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
 
#define pgpipe(a)   pipe(a)
 
#define piperead(a, b, c)   read(a,b,c)
 
#define pipewrite(a, b, c)   write(a,b,c)
 
#define write_stderr(str)
 
#define messageStartsWith(msg, prefix)   (strncmp(msg, prefix, strlen(prefix)) == 0)
 

Typedefs

typedef struct ShutdownInformation ShutdownInformation
 
typedef struct DumpSignalInformation DumpSignalInformation
 

Enumerations

enum  T_WorkerStatus { WRKR_NOT_STARTED = 0, WRKR_IDLE, WRKR_WORKING, WRKR_TERMINATED }
 

Functions

static ParallelSlotGetMyPSlot (ParallelState *pstate)
 
static void archive_close_connection (int code, void *arg)
 
static void ShutdownWorkersHard (ParallelState *pstate)
 
static void WaitForTerminatingWorkers (ParallelState *pstate)
 
static void setup_cancel_handler (void)
 
static void set_cancel_pstate (ParallelState *pstate)
 
static void set_cancel_slot_archive (ParallelSlot *slot, ArchiveHandle *AH)
 
static void RunWorker (ArchiveHandle *AH, ParallelSlot *slot)
 
static int GetIdleWorker (ParallelState *pstate)
 
static bool HasEveryWorkerTerminated (ParallelState *pstate)
 
static void lockTableForWorker (ArchiveHandle *AH, TocEntry *te)
 
static void WaitForCommands (ArchiveHandle *AH, int pipefd[2])
 
static bool ListenToWorkers (ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
 
static char * getMessageFromLeader (int pipefd[2])
 
static void sendMessageToLeader (int pipefd[2], const char *str)
 
static int select_loop (int maxFd, fd_set *workerset)
 
static char * getMessageFromWorker (ParallelState *pstate, bool do_wait, int *worker)
 
static void sendMessageToWorker (ParallelState *pstate, int worker, const char *str)
 
static char * readMessageFromPipe (int fd)
 
void init_parallel_dump_utils (void)
 
void on_exit_close_archive (Archive *AHX)
 
static void sigTermHandler (SIGNAL_ARGS)
 
void set_archive_cancel_info (ArchiveHandle *AH, PGconn *conn)
 
ParallelStateParallelBackupStart (ArchiveHandle *AH)
 
void ParallelBackupEnd (ArchiveHandle *AH, ParallelState *pstate)
 
static void buildWorkerCommand (ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
 
static void parseWorkerCommand (ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
 
static void buildWorkerResponse (ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
 
static int parseWorkerResponse (ArchiveHandle *AH, TocEntry *te, const char *msg)
 
void DispatchJobForTocEntry (ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
 
bool IsEveryWorkerIdle (ParallelState *pstate)
 
void WaitForWorkers (ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
 

Variables

static ShutdownInformation shutdown_info
 
static volatile DumpSignalInformation signal_info
 

Macro Definition Documentation

◆ messageStartsWith

#define messageStartsWith (   msg,
  prefix 
)    (strncmp(msg, prefix, strlen(prefix)) == 0)

Definition at line 228 of file parallel.c.

Referenced by ListenToWorkers(), parseWorkerCommand(), and parseWorkerResponse().

◆ NO_SLOT

#define NO_SLOT   (-1) /* Failure result for GetIdleWorker() */

Definition at line 74 of file parallel.c.

Referenced by DispatchJobForTocEntry(), GetIdleWorker(), and WaitForWorkers().

◆ pgpipe

#define pgpipe (   a)    pipe(a)

Definition at line 139 of file parallel.c.

Referenced by ParallelBackupStart(), and readMessageFromPipe().

◆ PIPE_READ

#define PIPE_READ   0

Definition at line 71 of file parallel.c.

Referenced by getMessageFromLeader(), ParallelBackupStart(), and RunWorker().

◆ PIPE_WRITE

#define PIPE_WRITE   1

Definition at line 72 of file parallel.c.

Referenced by ParallelBackupStart(), RunWorker(), and sendMessageToLeader().

◆ piperead

#define piperead (   a,
  b,
  c 
)    read(a,b,c)

Definition at line 140 of file parallel.c.

Referenced by readMessageFromPipe().

◆ pipewrite

#define pipewrite (   a,
  b,
  c 
)    write(a,b,c)

Definition at line 141 of file parallel.c.

Referenced by sendMessageToLeader(), and sendMessageToWorker().

◆ WORKER_IS_RUNNING

#define WORKER_IS_RUNNING (   workerStatus)    ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)

◆ write_stderr

#define write_stderr (   str)

Typedef Documentation

◆ DumpSignalInformation

◆ ShutdownInformation

Enumeration Type Documentation

◆ T_WorkerStatus

Enumerator
WRKR_NOT_STARTED 
WRKR_IDLE 
WRKR_WORKING 
WRKR_TERMINATED 

Definition at line 77 of file parallel.c.

Function Documentation

◆ archive_close_connection()

static void archive_close_connection ( int  code,
void *  arg 
)
static

Definition at line 344 of file parallel.c.

References ParallelSlot::AH, ShutdownInformation::AHX, closesocket, DisconnectDatabase(), GetMyPSlot(), ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ShutdownInformation::pstate, _archiveHandle::public, and ShutdownWorkersHard().

Referenced by on_exit_close_archive().

345 {
347 
348  if (si->pstate)
349  {
350  /* In parallel mode, must figure out who we are */
351  ParallelSlot *slot = GetMyPSlot(si->pstate);
352 
353  if (!slot)
354  {
355  /*
356  * We're the leader. Forcibly shut down workers, then close our
357  * own database connection, if any.
358  */
360 
361  if (si->AHX)
362  DisconnectDatabase(si->AHX);
363  }
364  else
365  {
366  /*
367  * We're a worker. Shut down our own DB connection if any. On
368  * Windows, we also have to close our communication sockets, to
369  * emulate what will happen on Unix when the worker process exits.
370  * (Without this, if this is a premature exit, the leader would
371  * fail to detect it because there would be no EOF condition on
372  * the other end of the pipe.)
373  */
374  if (slot->AH)
375  DisconnectDatabase(&(slot->AH->public));
376 
377 #ifdef WIN32
378  closesocket(slot->pipeRevRead);
379  closesocket(slot->pipeRevWrite);
380 #endif
381  }
382  }
383  else
384  {
385  /* Non-parallel operation: just kill the leader DB connection */
386  if (si->AHX)
387  DisconnectDatabase(si->AHX);
388  }
389 }
ArchiveHandle * AH
Definition: parallel.c:103
#define closesocket
Definition: port.h:333
static void ShutdownWorkersHard(ParallelState *pstate)
Definition: parallel.c:400
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
Definition: parallel.c:269
int pipeRevRead
Definition: parallel.c:107
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:230
ParallelState * pstate
Definition: parallel.c:150
int pipeRevWrite
Definition: parallel.c:108
void * arg

◆ buildWorkerCommand()

static void buildWorkerCommand ( ArchiveHandle AH,
TocEntry te,
T_Action  act,
char *  buf,
int  buflen 
)
static

Definition at line 1113 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, Assert, _tocEntry::dumpId, and snprintf.

Referenced by DispatchJobForTocEntry().

1115 {
1116  if (act == ACT_DUMP)
1117  snprintf(buf, buflen, "DUMP %d", te->dumpId);
1118  else if (act == ACT_RESTORE)
1119  snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1120  else
1121  Assert(false);
1122 }
static char * buf
Definition: pg_test_fsync.c:68
#define Assert(condition)
Definition: c.h:804
#define snprintf
Definition: port.h:217

◆ buildWorkerResponse()

static void buildWorkerResponse ( ArchiveHandle AH,
TocEntry te,
T_Action  act,
int  status,
char *  buf,
int  buflen 
)
static

Definition at line 1161 of file parallel.c.

References _tocEntry::dumpId, Archive::n_errors, _archiveHandle::public, snprintf, and WORKER_IGNORED_ERRORS.

Referenced by WaitForCommands().

1163 {
1164  snprintf(buf, buflen, "OK %d %d %d",
1165  te->dumpId,
1166  status,
1168 }
int n_errors
Definition: pg_backup.h:215
#define WORKER_IGNORED_ERRORS
static char * buf
Definition: pg_test_fsync.c:68
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
#define snprintf
Definition: port.h:217

◆ DispatchJobForTocEntry()

void DispatchJobForTocEntry ( ArchiveHandle AH,
ParallelState pstate,
TocEntry te,
T_Action  act,
ParallelCompletionPtr  callback,
void *  callback_data 
)

Definition at line 1210 of file parallel.c.

References buf, buildWorkerCommand(), ParallelSlot::callback, ParallelSlot::callback_data, GetIdleWorker(), NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelState::te, WaitForWorkers(), WFW_ONE_IDLE, ParallelSlot::workerStatus, and WRKR_WORKING.

Referenced by restore_toc_entries_parallel(), and WriteDataChunks().

1216 {
1217  int worker;
1218  char buf[256];
1219 
1220  /* Get a worker, waiting if none are idle */
1221  while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1222  WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1223 
1224  /* Construct and send command string */
1225  buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1226 
1227  sendMessageToWorker(pstate, worker, buf);
1228 
1229  /* Remember worker is busy, and which TocEntry it's working on */
1230  pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1231  pstate->parallelSlot[worker].callback = callback;
1232  pstate->parallelSlot[worker].callback_data = callback_data;
1233  pstate->te[worker] = te;
1234 }
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
Definition: parallel.c:1113
void * callback_data
Definition: parallel.c:101
static int GetIdleWorker(ParallelState *pstate)
Definition: parallel.c:1241
#define NO_SLOT
Definition: parallel.c:74
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:48
static char * buf
Definition: pg_test_fsync.c:68
ParallelSlot * parallelSlot
Definition: parallel.h:60
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
Definition: parallel.c:1649
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1456
TocEntry ** te
Definition: parallel.h:59
ParallelCompletionPtr callback
Definition: parallel.c:100
T_WorkerStatus workerStatus
Definition: parallel.c:97

◆ GetIdleWorker()

static int GetIdleWorker ( ParallelState pstate)
static

Definition at line 1241 of file parallel.c.

References i, NO_SLOT, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by DispatchJobForTocEntry(), and WaitForWorkers().

1242 {
1243  int i;
1244 
1245  for (i = 0; i < pstate->numWorkers; i++)
1246  {
1247  if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1248  return i;
1249  }
1250  return NO_SLOT;
1251 }
#define NO_SLOT
Definition: parallel.c:74
ParallelSlot * parallelSlot
Definition: parallel.h:60
T_WorkerStatus workerStatus
Definition: parallel.c:97
int i
int numWorkers
Definition: parallel.h:57

◆ getMessageFromLeader()

static char * getMessageFromLeader ( int  pipefd[2])
static

Definition at line 1521 of file parallel.c.

References PIPE_READ, and readMessageFromPipe().

Referenced by WaitForCommands().

1522 {
1523  return readMessageFromPipe(pipefd[PIPE_READ]);
1524 }
#define PIPE_READ
Definition: parallel.c:71
static char * readMessageFromPipe(int fd)
Definition: parallel.c:1667

◆ getMessageFromWorker()

static char * getMessageFromWorker ( ParallelState pstate,
bool  do_wait,
int *  worker 
)
static

Definition at line 1584 of file parallel.c.

References Assert, fatal, i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, readMessageFromPipe(), select, select_loop(), WORKER_IS_RUNNING, and ParallelSlot::workerStatus.

Referenced by ListenToWorkers().

1585 {
1586  int i;
1587  fd_set workerset;
1588  int maxFd = -1;
1589  struct timeval nowait = {0, 0};
1590 
1591  /* construct bitmap of socket descriptors for select() */
1592  FD_ZERO(&workerset);
1593  for (i = 0; i < pstate->numWorkers; i++)
1594  {
1595  if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1596  continue;
1597  FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1598  if (pstate->parallelSlot[i].pipeRead > maxFd)
1599  maxFd = pstate->parallelSlot[i].pipeRead;
1600  }
1601 
1602  if (do_wait)
1603  {
1604  i = select_loop(maxFd, &workerset);
1605  Assert(i != 0);
1606  }
1607  else
1608  {
1609  if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1610  return NULL;
1611  }
1612 
1613  if (i < 0)
1614  fatal("%s() failed: %m", "select");
1615 
1616  for (i = 0; i < pstate->numWorkers; i++)
1617  {
1618  char *msg;
1619 
1620  if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1621  continue;
1622  if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1623  continue;
1624 
1625  /*
1626  * Read the message if any. If the socket is ready because of EOF,
1627  * we'll return NULL instead (and the socket will stay ready, so the
1628  * condition will persist).
1629  *
1630  * Note: because this is a blocking read, we'll wait if only part of
1631  * the message is available. Waiting a long time would be bad, but
1632  * since worker status messages are short and are always sent in one
1633  * operation, it shouldn't be a problem in practice.
1634  */
1635  msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1636  *worker = i;
1637  return msg;
1638  }
1639  Assert(false);
1640  return NULL;
1641 }
int pipeRead
Definition: parallel.c:105
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel.c:1545
static char * readMessageFromPipe(int fd)
Definition: parallel.c:1667
#define select(n, r, w, e, timeout)
Definition: win32_port.h:474
ParallelSlot * parallelSlot
Definition: parallel.h:60
#define Assert(condition)
Definition: c.h:804
#define fatal(...)
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
static bool do_wait
Definition: pg_ctl.c:79
int i
int numWorkers
Definition: parallel.h:57

◆ GetMyPSlot()

static ParallelSlot * GetMyPSlot ( ParallelState pstate)
static

Definition at line 269 of file parallel.c.

References createPQExpBuffer(), i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, and resetPQExpBuffer().

Referenced by archive_close_connection().

270 {
271  int i;
272 
273  for (i = 0; i < pstate->numWorkers; i++)
274  {
275 #ifdef WIN32
276  if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
277 #else
278  if (pstate->parallelSlot[i].pid == getpid())
279 #endif
280  return &(pstate->parallelSlot[i]);
281  }
282 
283  return NULL;
284 }
pid_t pid
Definition: parallel.c:115
ParallelSlot * parallelSlot
Definition: parallel.h:60
int i
int numWorkers
Definition: parallel.h:57

◆ HasEveryWorkerTerminated()

static bool HasEveryWorkerTerminated ( ParallelState pstate)
static

Definition at line 1257 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, WORKER_IS_RUNNING, and ParallelSlot::workerStatus.

Referenced by WaitForTerminatingWorkers().

1258 {
1259  int i;
1260 
1261  for (i = 0; i < pstate->numWorkers; i++)
1262  {
1264  return false;
1265  }
1266  return true;
1267 }
ParallelSlot * parallelSlot
Definition: parallel.h:60
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
int i
int numWorkers
Definition: parallel.h:57

◆ init_parallel_dump_utils()

void init_parallel_dump_utils ( void  )

Definition at line 238 of file parallel.c.

References exit_nicely, and pg_log_error.

Referenced by main().

239 {
240 #ifdef WIN32
241  if (!parallel_init_done)
242  {
243  WSADATA wsaData;
244  int err;
245 
246  /* Prepare for threaded operation */
247  tls_index = TlsAlloc();
248  mainThreadId = GetCurrentThreadId();
249 
250  /* Initialize socket access */
251  err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252  if (err != 0)
253  {
254  pg_log_error("%s() failed: error code %d", "WSAStartup", err);
255  exit_nicely(1);
256  }
257 
258  parallel_init_done = true;
259  }
260 #endif
261 }
#define pg_log_error(...)
Definition: logging.h:80
#define exit_nicely(code)
Definition: pg_dumpall.c:97

◆ IsEveryWorkerIdle()

bool IsEveryWorkerIdle ( ParallelState pstate)

Definition at line 1273 of file parallel.c.

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by ParallelBackupEnd(), restore_toc_entries_parallel(), and WaitForWorkers().

1274 {
1275  int i;
1276 
1277  for (i = 0; i < pstate->numWorkers; i++)
1278  {
1279  if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1280  return false;
1281  }
1282  return true;
1283 }
ParallelSlot * parallelSlot
Definition: parallel.h:60
T_WorkerStatus workerStatus
Definition: parallel.c:97
int i
int numWorkers
Definition: parallel.h:57

◆ ListenToWorkers()

static bool ListenToWorkers ( ArchiveHandle AH,
ParallelState pstate,
bool  do_wait 
)
static

Definition at line 1403 of file parallel.c.

References ParallelSlot::callback, ParallelSlot::callback_data, fatal, free, getMessageFromWorker(), messageStartsWith, ParallelState::parallelSlot, parseWorkerResponse(), status(), ParallelState::te, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by WaitForWorkers().

1404 {
1405  int worker;
1406  char *msg;
1407 
1408  /* Try to collect a status message */
1409  msg = getMessageFromWorker(pstate, do_wait, &worker);
1410 
1411  if (!msg)
1412  {
1413  /* If do_wait is true, we must have detected EOF on some socket */
1414  if (do_wait)
1415  fatal("a worker process died unexpectedly");
1416  return false;
1417  }
1418 
1419  /* Process it and update our idea of the worker's status */
1420  if (messageStartsWith(msg, "OK "))
1421  {
1422  ParallelSlot *slot = &pstate->parallelSlot[worker];
1423  TocEntry *te = pstate->te[worker];
1424  int status;
1425 
1426  status = parseWorkerResponse(AH, te, msg);
1427  slot->callback(AH, te, status, slot->callback_data);
1428  slot->workerStatus = WRKR_IDLE;
1429  pstate->te[worker] = NULL;
1430  }
1431  else
1432  fatal("invalid message received from worker: \"%s\"",
1433  msg);
1434 
1435  /* Free the string returned from getMessageFromWorker */
1436  free(msg);
1437 
1438  return true;
1439 }
void * callback_data
Definition: parallel.c:101
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
Definition: parallel.c:1584
ParallelSlot * parallelSlot
Definition: parallel.h:60
#define free(a)
Definition: header.h:65
TocEntry ** te
Definition: parallel.h:59
ParallelCompletionPtr callback
Definition: parallel.c:100
#define fatal(...)
T_WorkerStatus workerStatus
Definition: parallel.c:97
static bool do_wait
Definition: pg_ctl.c:79
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
Definition: parallel.c:1176
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228

◆ lockTableForWorker()

static void lockTableForWorker ( ArchiveHandle AH,
TocEntry te 
)
static

Definition at line 1306 of file parallel.c.

References appendPQExpBuffer(), _archiveHandle::connection, createPQExpBuffer(), PQExpBufferData::data, _tocEntry::desc, destroyPQExpBuffer(), fatal, fmtQualifiedId(), PGRES_COMMAND_OK, PQclear(), PQexec(), PQresultStatus(), and _tocEntry::tag.

Referenced by WaitForCommands().

1307 {
1308  const char *qualId;
1309  PQExpBuffer query;
1310  PGresult *res;
1311 
1312  /* Nothing to do for BLOBS */
1313  if (strcmp(te->desc, "BLOBS") == 0)
1314  return;
1315 
1316  query = createPQExpBuffer();
1317 
1318  qualId = fmtQualifiedId(te->namespace, te->tag);
1319 
1320  appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1321  qualId);
1322 
1323  res = PQexec(AH->connection, query->data);
1324 
1325  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1326  fatal("could not obtain lock on relation \"%s\"\n"
1327  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1328  "on the table after the pg_dump parent process had gotten the "
1329  "initial ACCESS SHARE lock on the table.", qualId);
1330 
1331  PQclear(res);
1332  destroyPQExpBuffer(query);
1333 }
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3178
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:116
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:74
void PQclear(PGresult *res)
Definition: fe-exec.c:694
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
#define fatal(...)
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2193

◆ on_exit_close_archive()

void on_exit_close_archive ( Archive AHX)

Definition at line 333 of file parallel.c.

References ShutdownInformation::AHX, archive_close_connection(), and on_exit_nicely().

Referenced by main().

334 {
335  shutdown_info.AHX = AHX;
337 }
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static ShutdownInformation shutdown_info
Definition: parallel.c:154
static void archive_close_connection(int code, void *arg)
Definition: parallel.c:344

◆ ParallelBackupEnd()

void ParallelBackupEnd ( ArchiveHandle AH,
ParallelState pstate 
)

Definition at line 1064 of file parallel.c.

References Assert, closesocket, free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, set_cancel_pstate(), ParallelState::te, and WaitForTerminatingWorkers().

Referenced by _CloseArchive(), and RestoreArchive().

1065 {
1066  int i;
1067 
1068  /* No work if non-parallel */
1069  if (pstate->numWorkers == 1)
1070  return;
1071 
1072  /* There should not be any unfinished jobs */
1073  Assert(IsEveryWorkerIdle(pstate));
1074 
1075  /* Close the sockets so that the workers know they can exit */
1076  for (i = 0; i < pstate->numWorkers; i++)
1077  {
1078  closesocket(pstate->parallelSlot[i].pipeRead);
1079  closesocket(pstate->parallelSlot[i].pipeWrite);
1080  }
1081 
1082  /* Wait for them to exit */
1083  WaitForTerminatingWorkers(pstate);
1084 
1085  /*
1086  * Unlink pstate from shutdown_info, so the exit handler will not try to
1087  * use it; and likewise unlink from signal_info.
1088  */
1089  shutdown_info.pstate = NULL;
1090  set_cancel_pstate(NULL);
1091 
1092  /* Release state (mere neatnik-ism, since we're about to terminate) */
1093  free(pstate->te);
1094  free(pstate->parallelSlot);
1095  free(pstate);
1096 }
int pipeRead
Definition: parallel.c:105
static void set_cancel_pstate(ParallelState *pstate)
Definition: parallel.c:794
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1273
#define closesocket
Definition: port.h:333
ParallelSlot * parallelSlot
Definition: parallel.h:60
ParallelState * pstate
Definition: parallel.c:150
#define free(a)
Definition: header.h:65
static ShutdownInformation shutdown_info
Definition: parallel.c:154
#define Assert(condition)
Definition: c.h:804
TocEntry ** te
Definition: parallel.h:59
int pipeWrite
Definition: parallel.c:106
int i
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition: parallel.c:451
int numWorkers
Definition: parallel.h:57

◆ ParallelBackupStart()

ParallelState* ParallelBackupStart ( ArchiveHandle AH)

Definition at line 902 of file parallel.c.

References ParallelSlot::AH, DumpSignalInformation::am_worker, Assert, closesocket, _archiveHandle::connection, fatal, getLocalPQExpBuffer, i, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_malloc(), pg_malloc0(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, pqsignal(), ShutdownInformation::pstate, _archiveHandle::public, RunWorker(), set_archive_cancel_info(), set_cancel_pstate(), SIG_IGN, SIGPIPE, ParallelState::te, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by _CloseArchive(), and RestoreArchive().

903 {
904  ParallelState *pstate;
905  int i;
906 
907  Assert(AH->public.numWorkers > 0);
908 
909  pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
910 
911  pstate->numWorkers = AH->public.numWorkers;
912  pstate->te = NULL;
913  pstate->parallelSlot = NULL;
914 
915  if (AH->public.numWorkers == 1)
916  return pstate;
917 
918  /* Create status arrays, being sure to initialize all fields to 0 */
919  pstate->te = (TocEntry **)
920  pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
921  pstate->parallelSlot = (ParallelSlot *)
922  pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
923 
924 #ifdef WIN32
925  /* Make fmtId() and fmtQualifiedId() use thread-local storage */
926  getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
927 #endif
928 
929  /*
930  * Set the pstate in shutdown_info, to tell the exit handler that it must
931  * clean up workers as well as the main database connection. But we don't
932  * set this in signal_info yet, because we don't want child processes to
933  * inherit non-NULL signal_info.pstate.
934  */
935  shutdown_info.pstate = pstate;
936 
937  /*
938  * Temporarily disable query cancellation on the leader connection. This
939  * ensures that child processes won't inherit valid AH->connCancel
940  * settings and thus won't try to issue cancels against the leader's
941  * connection. No harm is done if we fail while it's disabled, because
942  * the leader connection is idle at this point anyway.
943  */
944  set_archive_cancel_info(AH, NULL);
945 
946  /* Ensure stdio state is quiesced before forking */
947  fflush(NULL);
948 
949  /* Create desired number of workers */
950  for (i = 0; i < pstate->numWorkers; i++)
951  {
952 #ifdef WIN32
953  WorkerInfo *wi;
954  uintptr_t handle;
955 #else
956  pid_t pid;
957 #endif
958  ParallelSlot *slot = &(pstate->parallelSlot[i]);
959  int pipeMW[2],
960  pipeWM[2];
961 
962  /* Create communication pipes for this worker */
963  if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
964  fatal("could not create communication channels: %m");
965 
966  /* leader's ends of the pipes */
967  slot->pipeRead = pipeWM[PIPE_READ];
968  slot->pipeWrite = pipeMW[PIPE_WRITE];
969  /* child's ends of the pipes */
970  slot->pipeRevRead = pipeMW[PIPE_READ];
971  slot->pipeRevWrite = pipeWM[PIPE_WRITE];
972 
973 #ifdef WIN32
974  /* Create transient structure to pass args to worker function */
975  wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
976 
977  wi->AH = AH;
978  wi->slot = slot;
979 
980  handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
981  wi, 0, &(slot->threadId));
982  slot->hThread = handle;
983  slot->workerStatus = WRKR_IDLE;
984 #else /* !WIN32 */
985  pid = fork();
986  if (pid == 0)
987  {
988  /* we are the worker */
989  int j;
990 
991  /* this is needed for GetMyPSlot() */
992  slot->pid = getpid();
993 
994  /* instruct signal handler that we're in a worker now */
995  signal_info.am_worker = true;
996 
997  /* close read end of Worker -> Leader */
998  closesocket(pipeWM[PIPE_READ]);
999  /* close write end of Leader -> Worker */
1000  closesocket(pipeMW[PIPE_WRITE]);
1001 
1002  /*
1003  * Close all inherited fds for communication of the leader with
1004  * previously-forked workers.
1005  */
1006  for (j = 0; j < i; j++)
1007  {
1008  closesocket(pstate->parallelSlot[j].pipeRead);
1009  closesocket(pstate->parallelSlot[j].pipeWrite);
1010  }
1011 
1012  /* Run the worker ... */
1013  RunWorker(AH, slot);
1014 
1015  /* We can just exit(0) when done */
1016  exit(0);
1017  }
1018  else if (pid < 0)
1019  {
1020  /* fork failed */
1021  fatal("could not create worker process: %m");
1022  }
1023 
1024  /* In Leader after successful fork */
1025  slot->pid = pid;
1026  slot->workerStatus = WRKR_IDLE;
1027 
1028  /* close read end of Leader -> Worker */
1029  closesocket(pipeMW[PIPE_READ]);
1030  /* close write end of Worker -> Leader */
1031  closesocket(pipeWM[PIPE_WRITE]);
1032 #endif /* WIN32 */
1033  }
1034 
1035  /*
1036  * Having forked off the workers, disable SIGPIPE so that leader isn't
1037  * killed if it tries to send a command to a dead worker. We don't want
1038  * the workers to inherit this setting, though.
1039  */
1040 #ifndef WIN32
1042 #endif
1043 
1044  /*
1045  * Re-establish query cancellation on the leader connection.
1046  */
1048 
1049  /*
1050  * Tell the cancel signal handler to forward signals to worker processes,
1051  * too. (As with query cancel, we did not need this earlier because the
1052  * workers have not yet been given anything to do; if we die before this
1053  * point, any already-started workers will see EOF and quit promptly.)
1054  */
1055  set_cancel_pstate(pstate);
1056 
1057  return pstate;
1058 }
int pipeRead
Definition: parallel.c:105
static void set_cancel_pstate(ParallelState *pstate)
Definition: parallel.c:794
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define closesocket
Definition: port.h:333
#define PIPE_READ
Definition: parallel.c:71
#define SIGPIPE
Definition: win32_port.h:172
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
pid_t pid
Definition: parallel.c:115
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
Definition: parallel.c:834
int pipeRevRead
Definition: parallel.c:107
#define pgpipe(a)
Definition: parallel.c:139
#define SIG_IGN
Definition: win32_port.h:164
ParallelSlot * parallelSlot
Definition: parallel.h:60
ParallelState * pstate
Definition: parallel.c:150
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static ShutdownInformation shutdown_info
Definition: parallel.c:154
#define Assert(condition)
Definition: c.h:804
TocEntry ** te
Definition: parallel.h:59
int pipeRevWrite
Definition: parallel.c:108
PQExpBuffer(* getLocalPQExpBuffer)(void)
Definition: string_utils.c:27
#define fatal(...)
int numWorkers
Definition: pg_backup.h:202
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
T_WorkerStatus workerStatus
Definition: parallel.c:97
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
Definition: parallel.c:735
int pipeWrite
Definition: parallel.c:106
int i
#define PIPE_WRITE
Definition: parallel.c:72
int numWorkers
Definition: parallel.h:57

◆ parseWorkerCommand()

static void parseWorkerCommand ( ArchiveHandle AH,
TocEntry **  te,
T_Action act,
const char *  msg 
)
static

Definition at line 1128 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, Assert, fatal, getTocEntryByDumpId(), and messageStartsWith.

Referenced by WaitForCommands().

1130 {
1131  DumpId dumpId;
1132  int nBytes;
1133 
1134  if (messageStartsWith(msg, "DUMP "))
1135  {
1136  *act = ACT_DUMP;
1137  sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1138  Assert(nBytes == strlen(msg));
1139  *te = getTocEntryByDumpId(AH, dumpId);
1140  Assert(*te != NULL);
1141  }
1142  else if (messageStartsWith(msg, "RESTORE "))
1143  {
1144  *act = ACT_RESTORE;
1145  sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1146  Assert(nBytes == strlen(msg));
1147  *te = getTocEntryByDumpId(AH, dumpId);
1148  Assert(*te != NULL);
1149  }
1150  else
1151  fatal("unrecognized command received from leader: \"%s\"",
1152  msg);
1153 }
int DumpId
Definition: pg_backup.h:244
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
#define Assert(condition)
Definition: c.h:804
#define fatal(...)
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228

◆ parseWorkerResponse()

static int parseWorkerResponse ( ArchiveHandle AH,
TocEntry te,
const char *  msg 
)
static

Definition at line 1176 of file parallel.c.

References Assert, _tocEntry::dumpId, fatal, messageStartsWith, Archive::n_errors, _archiveHandle::public, and status().

Referenced by ListenToWorkers().

1178 {
1179  DumpId dumpId;
1180  int nBytes,
1181  n_errors;
1182  int status = 0;
1183 
1184  if (messageStartsWith(msg, "OK "))
1185  {
1186  sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1187 
1188  Assert(dumpId == te->dumpId);
1189  Assert(nBytes == strlen(msg));
1190 
1191  AH->public.n_errors += n_errors;
1192  }
1193  else
1194  fatal("invalid message received from worker: \"%s\"",
1195  msg);
1196 
1197  return status;
1198 }
int DumpId
Definition: pg_backup.h:244
int n_errors
Definition: pg_backup.h:215
#define Assert(condition)
Definition: c.h:804
#define fatal(...)
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228

◆ readMessageFromPipe()

static char * readMessageFromPipe ( int  fd)
static

Definition at line 1667 of file parallel.c.

References accept, Assert, bind, closesocket, connect, listen, pg_free(), pg_hton16, pg_hton32, pg_log_error, pg_malloc(), pg_realloc(), PGINVALID_SOCKET, pgpipe, piperead, and socket.

Referenced by getMessageFromLeader(), and getMessageFromWorker().

1668 {
1669  char *msg;
1670  int msgsize,
1671  bufsize;
1672  int ret;
1673 
1674  /*
1675  * In theory, if we let piperead() read multiple bytes, it might give us
1676  * back fragments of multiple messages. (That can't actually occur, since
1677  * neither leader nor workers send more than one message without waiting
1678  * for a reply, but we don't wish to assume that here.) For simplicity,
1679  * read a byte at a time until we get the terminating '\0'. This method
1680  * is a bit inefficient, but since this is only used for relatively short
1681  * command and status strings, it shouldn't matter.
1682  */
1683  bufsize = 64; /* could be any number */
1684  msg = (char *) pg_malloc(bufsize);
1685  msgsize = 0;
1686  for (;;)
1687  {
1688  Assert(msgsize < bufsize);
1689  ret = piperead(fd, msg + msgsize, 1);
1690  if (ret <= 0)
1691  break; /* error or connection closure */
1692 
1693  Assert(ret == 1);
1694 
1695  if (msg[msgsize] == '\0')
1696  return msg; /* collected whole message */
1697 
1698  msgsize++;
1699  if (msgsize == bufsize) /* enlarge buffer if needed */
1700  {
1701  bufsize += 16; /* could be any number */
1702  msg = (char *) pg_realloc(msg, bufsize);
1703  }
1704  }
1705 
1706  /* Other end has closed the connection */
1707  pg_free(msg);
1708  return NULL;
1709 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static int fd(const char *x, int i)
Definition: preproc-init.c:105
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
#define Assert(condition)
Definition: c.h:804
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define piperead(a, b, c)
Definition: parallel.c:140

◆ RunWorker()

static void RunWorker ( ArchiveHandle AH,
ParallelSlot slot 
)
static

Definition at line 834 of file parallel.c.

References ParallelSlot::AH, CloneArchive(), DeCloneArchive(), DisconnectDatabase(), free, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, _archiveHandle::public, set_cancel_slot_archive(), _archiveHandle::SetupWorkerPtr, and WaitForCommands().

Referenced by ParallelBackupStart().

835 {
836  int pipefd[2];
837 
838  /* fetch child ends of pipes */
839  pipefd[PIPE_READ] = slot->pipeRevRead;
840  pipefd[PIPE_WRITE] = slot->pipeRevWrite;
841 
842  /*
843  * Clone the archive so that we have our own state to work with, and in
844  * particular our own database connection.
845  *
846  * We clone on Unix as well as Windows, even though technically we don't
847  * need to because fork() gives us a copy in our own address space
848  * already. But CloneArchive resets the state information and also clones
849  * the database connection which both seem kinda helpful.
850  */
851  AH = CloneArchive(AH);
852 
853  /* Remember cloned archive where signal handler can find it */
854  set_cancel_slot_archive(slot, AH);
855 
856  /*
857  * Call the setup worker function that's defined in the ArchiveHandle.
858  */
859  (AH->SetupWorkerPtr) ((Archive *) AH);
860 
861  /*
862  * Execute commands until done.
863  */
864  WaitForCommands(AH, pipefd);
865 
866  /*
867  * Disconnect from database and clean up.
868  */
869  set_cancel_slot_archive(slot, NULL);
870  DisconnectDatabase(&(AH->public));
871  DeCloneArchive(AH);
872 }
#define PIPE_READ
Definition: parallel.c:71
SetupWorkerPtrType SetupWorkerPtr
void DeCloneArchive(ArchiveHandle *AH)
int pipeRevRead
Definition: parallel.c:107
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:230
int pipeRevWrite
Definition: parallel.c:108
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
Definition: parallel.c:814
#define PIPE_WRITE
Definition: parallel.c:72
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
Definition: parallel.c:1341

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  workerset 
)
static

Definition at line 1545 of file parallel.c.

References EINTR, i, and select.

Referenced by getMessageFromWorker().

1546 {
1547  int i;
1548  fd_set saveSet = *workerset;
1549 
1550  for (;;)
1551  {
1552  *workerset = saveSet;
1553  i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1554 
1555 #ifndef WIN32
1556  if (i < 0 && errno == EINTR)
1557  continue;
1558 #else
1559  if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1560  continue;
1561 #endif
1562  break;
1563  }
1564 
1565  return i;
1566 }
#define select(n, r, w, e, timeout)
Definition: win32_port.h:474
int i
#define EINTR
Definition: win32_port.h:351

◆ sendMessageToLeader()

static void sendMessageToLeader ( int  pipefd[2],
const char *  str 
)
static

Definition at line 1532 of file parallel.c.

References fatal, PIPE_WRITE, and pipewrite.

Referenced by WaitForCommands().

1533 {
1534  int len = strlen(str) + 1;
1535 
1536  if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1537  fatal("could not write to the communication channel: %m");
1538 }
#define fatal(...)
#define pipewrite(a, b, c)
Definition: parallel.c:141
#define PIPE_WRITE
Definition: parallel.c:72

◆ sendMessageToWorker()

static void sendMessageToWorker ( ParallelState pstate,
int  worker,
const char *  str 
)
static

Definition at line 1649 of file parallel.c.

References fatal, ParallelState::parallelSlot, ParallelSlot::pipeWrite, and pipewrite.

Referenced by DispatchJobForTocEntry().

1650 {
1651  int len = strlen(str) + 1;
1652 
1653  if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1654  {
1655  fatal("could not write to the communication channel: %m");
1656  }
1657 }
ParallelSlot * parallelSlot
Definition: parallel.h:60
#define fatal(...)
int pipeWrite
Definition: parallel.c:106
#define pipewrite(a, b, c)
Definition: parallel.c:141

◆ set_archive_cancel_info()

void set_archive_cancel_info ( ArchiveHandle AH,
PGconn conn 
)

Definition at line 735 of file parallel.c.

References ParallelSlot::AH, _archiveHandle::connCancel, DumpSignalInformation::myAH, PQfreeCancel(), PQgetCancel(), and setup_cancel_handler().

Referenced by ConnectDatabase(), DisconnectDatabase(), and ParallelBackupStart().

736 {
737  PGcancel *oldConnCancel;
738 
739  /*
740  * Activate the interrupt handler if we didn't yet in this process. On
741  * Windows, this also initializes signal_info_lock; therefore it's
742  * important that this happen at least once before we fork off any
743  * threads.
744  */
746 
747  /*
748  * On Unix, we assume that storing a pointer value is atomic with respect
749  * to any possible signal interrupt. On Windows, use a critical section.
750  */
751 
752 #ifdef WIN32
753  EnterCriticalSection(&signal_info_lock);
754 #endif
755 
756  /* Free the old one if we have one */
757  oldConnCancel = AH->connCancel;
758  /* be sure interrupt handler doesn't use pointer while freeing */
759  AH->connCancel = NULL;
760 
761  if (oldConnCancel != NULL)
762  PQfreeCancel(oldConnCancel);
763 
764  /* Set the new one if specified */
765  if (conn)
766  AH->connCancel = PQgetCancel(conn);
767 
768  /*
769  * On Unix, there's only ever one active ArchiveHandle per process, so we
770  * can just set signal_info.myAH unconditionally. On Windows, do that
771  * only in the main thread; worker threads have to make sure their
772  * ArchiveHandle appears in the pstate data, which is dealt with in
773  * RunWorker().
774  */
775 #ifndef WIN32
776  signal_info.myAH = AH;
777 #else
778  if (mainThreadId == GetCurrentThreadId())
779  signal_info.myAH = AH;
780 #endif
781 
782 #ifdef WIN32
783  LeaveCriticalSection(&signal_info_lock);
784 #endif
785 }
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4375
ArchiveHandle * myAH
Definition: parallel.c:167
static void setup_cancel_handler(void)
Definition: parallel.c:613
PGcancel *volatile connCancel
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4352
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175

◆ set_cancel_pstate()

static void set_cancel_pstate ( ParallelState pstate)
static

Definition at line 794 of file parallel.c.

References DumpSignalInformation::pstate.

Referenced by ParallelBackupEnd(), and ParallelBackupStart().

795 {
796 #ifdef WIN32
797  EnterCriticalSection(&signal_info_lock);
798 #endif
799 
800  signal_info.pstate = pstate;
801 
802 #ifdef WIN32
803  LeaveCriticalSection(&signal_info_lock);
804 #endif
805 }
ParallelState * pstate
Definition: parallel.c:168
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175

◆ set_cancel_slot_archive()

static void set_cancel_slot_archive ( ParallelSlot slot,
ArchiveHandle AH 
)
static

Definition at line 814 of file parallel.c.

References ParallelSlot::AH.

Referenced by RunWorker().

815 {
816 #ifdef WIN32
817  EnterCriticalSection(&signal_info_lock);
818 #endif
819 
820  slot->AH = AH;
821 
822 #ifdef WIN32
823  LeaveCriticalSection(&signal_info_lock);
824 #endif
825 }
ArchiveHandle * AH
Definition: parallel.c:103

◆ setup_cancel_handler()

static void setup_cancel_handler ( void  )
static

Definition at line 613 of file parallel.c.

References ParallelSlot::AH, _archiveHandle::connCancel, DumpSignalInformation::handler_set, i, DumpSignalInformation::myAH, ParallelState::numWorkers, ParallelState::parallelSlot, PQcancel(), pqsignal(), progname, DumpSignalInformation::pstate, SIGQUIT, sigTermHandler(), and write_stderr.

Referenced by main(), psql_setup_cancel_handler(), runInitSteps(), and set_archive_cancel_info().

614 {
615  /*
616  * When forking, signal_info.handler_set will propagate into the new
617  * process, but that's fine because the signal handler state does too.
618  */
620  {
621  signal_info.handler_set = true;
622 
623  pqsignal(SIGINT, sigTermHandler);
624  pqsignal(SIGTERM, sigTermHandler);
626  }
627 }
#define SIGQUIT
Definition: win32_port.h:168
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
static void sigTermHandler(SIGNAL_ARGS)
Definition: parallel.c:550

◆ ShutdownWorkersHard()

static void ShutdownWorkersHard ( ParallelState pstate)
static

Definition at line 400 of file parallel.c.

References ParallelSlot::AH, closesocket, _archiveHandle::connCancel, i, kill, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, ParallelSlot::pipeWrite, PQcancel(), and WaitForTerminatingWorkers().

Referenced by archive_close_connection().

401 {
402  int i;
403 
404  /*
405  * Close our write end of the sockets so that any workers waiting for
406  * commands know they can exit. (Note: some of the pipeWrite fields might
407  * still be zero, if we failed to initialize all the workers. Hence, just
408  * ignore errors here.)
409  */
410  for (i = 0; i < pstate->numWorkers; i++)
411  closesocket(pstate->parallelSlot[i].pipeWrite);
412 
413  /*
414  * Force early termination of any commands currently in progress.
415  */
416 #ifndef WIN32
417  /* On non-Windows, send SIGTERM to each worker process. */
418  for (i = 0; i < pstate->numWorkers; i++)
419  {
420  pid_t pid = pstate->parallelSlot[i].pid;
421 
422  if (pid != 0)
423  kill(pid, SIGTERM);
424  }
425 #else
426 
427  /*
428  * On Windows, send query cancels directly to the workers' backends. Use
429  * a critical section to ensure worker threads don't change state.
430  */
431  EnterCriticalSection(&signal_info_lock);
432  for (i = 0; i < pstate->numWorkers; i++)
433  {
434  ArchiveHandle *AH = pstate->parallelSlot[i].AH;
435  char errbuf[1];
436 
437  if (AH != NULL && AH->connCancel != NULL)
438  (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
439  }
440  LeaveCriticalSection(&signal_info_lock);
441 #endif
442 
443  /* Now wait for them to terminate. */
445 }
ArchiveHandle * AH
Definition: parallel.c:103
#define closesocket
Definition: port.h:333
#define kill(pid, sig)
Definition: win32_port.h:464
PGcancel *volatile connCancel
pid_t pid
Definition: parallel.c:115
ParallelSlot * parallelSlot
Definition: parallel.h:60
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4507
int pipeWrite
Definition: parallel.c:106
int i
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition: parallel.c:451
int numWorkers
Definition: parallel.h:57

◆ sigTermHandler()

static void sigTermHandler ( SIGNAL_ARGS  )
static

Definition at line 550 of file parallel.c.

References DumpSignalInformation::am_worker, _archiveHandle::connCancel, i, kill, DumpSignalInformation::myAH, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, PQcancel(), pqsignal(), progname, DumpSignalInformation::pstate, SIG_IGN, SIGQUIT, and write_stderr.

Referenced by setup_cancel_handler().

551 {
552  int i;
553  char errbuf[1];
554 
555  /*
556  * Some platforms allow delivery of new signals to interrupt an active
557  * signal handler. That could muck up our attempt to send PQcancel, so
558  * disable the signals that setup_cancel_handler enabled.
559  */
560  pqsignal(SIGINT, SIG_IGN);
561  pqsignal(SIGTERM, SIG_IGN);
563 
564  /*
565  * If we're in the leader, forward signal to all workers. (It seems best
566  * to do this before PQcancel; killing the leader transaction will result
567  * in invalid-snapshot errors from active workers, which maybe we can
568  * quiet by killing workers first.) Ignore any errors.
569  */
570  if (signal_info.pstate != NULL)
571  {
572  for (i = 0; i < signal_info.pstate->numWorkers; i++)
573  {
574  pid_t pid = signal_info.pstate->parallelSlot[i].pid;
575 
576  if (pid != 0)
577  kill(pid, SIGTERM);
578  }
579  }
580 
581  /*
582  * Send QueryCancel if we have a connection to send to. Ignore errors,
583  * there's not much we can do about them anyway.
584  */
585  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
586  (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
587 
588  /*
589  * Report we're quitting, using nothing more complicated than write(2).
590  * When in parallel operation, only the leader process should do this.
591  */
592  if (!signal_info.am_worker)
593  {
594  if (progname)
595  {
597  write_stderr(": ");
598  }
599  write_stderr("terminated by user\n");
600  }
601 
602  /*
603  * And die, using _exit() not exit() because the latter will invoke atexit
604  * handlers that can fail if we interrupted related code.
605  */
606  _exit(1);
607 }
#define SIGQUIT
Definition: win32_port.h:168
const char * progname
Definition: main.c:46
ArchiveHandle * myAH
Definition: parallel.c:167
#define write_stderr(str)
Definition: parallel.c:186
#define kill(pid, sig)
Definition: win32_port.h:464
PGcancel *volatile connCancel
ParallelState * pstate
Definition: parallel.c:168
pid_t pid
Definition: parallel.c:115
#define SIG_IGN
Definition: win32_port.h:164
ParallelSlot * parallelSlot
Definition: parallel.h:60
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4507
int i
int numWorkers
Definition: parallel.h:57

◆ WaitForCommands()

static void WaitForCommands ( ArchiveHandle AH,
int  pipefd[2] 
)
static

Definition at line 1341 of file parallel.c.

References ACT_DUMP, ACT_RESTORE, Assert, buf, buildWorkerResponse(), free, getMessageFromLeader(), lockTableForWorker(), parseWorkerCommand(), sendMessageToLeader(), status(), _archiveHandle::WorkerJobDumpPtr, and _archiveHandle::WorkerJobRestorePtr.

Referenced by RunWorker().

1342 {
1343  char *command;
1344  TocEntry *te;
1345  T_Action act;
1346  int status = 0;
1347  char buf[256];
1348 
1349  for (;;)
1350  {
1351  if (!(command = getMessageFromLeader(pipefd)))
1352  {
1353  /* EOF, so done */
1354  return;
1355  }
1356 
1357  /* Decode the command */
1358  parseWorkerCommand(AH, &te, &act, command);
1359 
1360  if (act == ACT_DUMP)
1361  {
1362  /* Acquire lock on this table within the worker's session */
1363  lockTableForWorker(AH, te);
1364 
1365  /* Perform the dump command */
1366  status = (AH->WorkerJobDumpPtr) (AH, te);
1367  }
1368  else if (act == ACT_RESTORE)
1369  {
1370  /* Perform the restore command */
1371  status = (AH->WorkerJobRestorePtr) (AH, te);
1372  }
1373  else
1374  Assert(false);
1375 
1376  /* Return status to leader */
1377  buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1378 
1379  sendMessageToLeader(pipefd, buf);
1380 
1381  /* command was pg_malloc'd and we are responsible for free()ing it. */
1382  free(command);
1383  }
1384 }
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
Definition: parallel.c:1161
static char * buf
Definition: pg_test_fsync.c:68
static void sendMessageToLeader(int pipefd[2], const char *str)
Definition: parallel.c:1532
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
Definition: parallel.c:1306
#define free(a)
Definition: header.h:65
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
Definition: parallel.c:1128
#define Assert(condition)
Definition: c.h:804
static char * getMessageFromLeader(int pipefd[2])
Definition: parallel.c:1521
WorkerJobRestorePtrType WorkerJobRestorePtr
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
WorkerJobDumpPtrType WorkerJobDumpPtr

◆ WaitForTerminatingWorkers()

static void WaitForTerminatingWorkers ( ParallelState pstate)
static

Definition at line 451 of file parallel.c.

References Assert, free, HasEveryWorkerTerminated(), ParallelState::numWorkers, ParallelState::parallelSlot, pg_malloc(), ParallelSlot::pid, status(), ParallelState::te, WORKER_IS_RUNNING, ParallelSlot::workerStatus, and WRKR_TERMINATED.

Referenced by ParallelBackupEnd(), and ShutdownWorkersHard().

452 {
453  while (!HasEveryWorkerTerminated(pstate))
454  {
455  ParallelSlot *slot = NULL;
456  int j;
457 
458 #ifndef WIN32
459  /* On non-Windows, use wait() to wait for next worker to end */
460  int status;
461  pid_t pid = wait(&status);
462 
463  /* Find dead worker's slot, and clear the PID field */
464  for (j = 0; j < pstate->numWorkers; j++)
465  {
466  slot = &(pstate->parallelSlot[j]);
467  if (slot->pid == pid)
468  {
469  slot->pid = 0;
470  break;
471  }
472  }
473 #else /* WIN32 */
474  /* On Windows, we must use WaitForMultipleObjects() */
475  HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
476  int nrun = 0;
477  DWORD ret;
478  uintptr_t hThread;
479 
480  for (j = 0; j < pstate->numWorkers; j++)
481  {
483  {
484  lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
485  nrun++;
486  }
487  }
488  ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
489  Assert(ret != WAIT_FAILED);
490  hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
491  free(lpHandles);
492 
493  /* Find dead worker's slot, and clear the hThread field */
494  for (j = 0; j < pstate->numWorkers; j++)
495  {
496  slot = &(pstate->parallelSlot[j]);
497  if (slot->hThread == hThread)
498  {
499  /* For cleanliness, close handles for dead threads */
500  CloseHandle((HANDLE) slot->hThread);
501  slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
502  break;
503  }
504  }
505 #endif /* WIN32 */
506 
507  /* On all platforms, update workerStatus and te[] as well */
508  Assert(j < pstate->numWorkers);
510  pstate->te[j] = NULL;
511  }
512 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static bool HasEveryWorkerTerminated(ParallelState *pstate)
Definition: parallel.c:1257
pid_t pid
Definition: parallel.c:115
ParallelSlot * parallelSlot
Definition: parallel.h:60
#define free(a)
Definition: header.h:65
#define Assert(condition)
Definition: c.h:804
TocEntry ** te
Definition: parallel.h:59
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:229
int numWorkers
Definition: parallel.h:57

◆ WaitForWorkers()

void WaitForWorkers ( ArchiveHandle AH,
ParallelState pstate,
WFW_WaitOption  mode 
)

Definition at line 1456 of file parallel.c.

References Assert, do_wait, GetIdleWorker(), IsEveryWorkerIdle(), ListenToWorkers(), NO_SLOT, WFW_ALL_IDLE, WFW_GOT_STATUS, WFW_NO_WAIT, and WFW_ONE_IDLE.

Referenced by DispatchJobForTocEntry(), restore_toc_entries_parallel(), and WriteDataChunks().

1457 {
1458  bool do_wait = false;
1459 
1460  /*
1461  * In GOT_STATUS mode, always block waiting for a message, since we can't
1462  * return till we get something. In other modes, we don't block the first
1463  * time through the loop.
1464  */
1465  if (mode == WFW_GOT_STATUS)
1466  {
1467  /* Assert that caller knows what it's doing */
1468  Assert(!IsEveryWorkerIdle(pstate));
1469  do_wait = true;
1470  }
1471 
1472  for (;;)
1473  {
1474  /*
1475  * Check for status messages, even if we don't need to block. We do
1476  * not try very hard to reap all available messages, though, since
1477  * there's unlikely to be more than one.
1478  */
1479  if (ListenToWorkers(AH, pstate, do_wait))
1480  {
1481  /*
1482  * If we got a message, we are done by definition for GOT_STATUS
1483  * mode, and we can also be certain that there's at least one idle
1484  * worker. So we're done in all but ALL_IDLE mode.
1485  */
1486  if (mode != WFW_ALL_IDLE)
1487  return;
1488  }
1489 
1490  /* Check whether we must wait for new status messages */
1491  switch (mode)
1492  {
1493  case WFW_NO_WAIT:
1494  return; /* never wait */
1495  case WFW_GOT_STATUS:
1496  Assert(false); /* can't get here, because we waited */
1497  break;
1498  case WFW_ONE_IDLE:
1499  if (GetIdleWorker(pstate) != NO_SLOT)
1500  return;
1501  break;
1502  case WFW_ALL_IDLE:
1503  if (IsEveryWorkerIdle(pstate))
1504  return;
1505  break;
1506  }
1507 
1508  /* Loop back, and this time wait for something to happen */
1509  do_wait = true;
1510  }
1511 }
static PgChecksumMode mode
Definition: pg_checksums.c:65
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1273
static int GetIdleWorker(ParallelState *pstate)
Definition: parallel.c:1241
#define NO_SLOT
Definition: parallel.c:74
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
Definition: parallel.c:1403
#define Assert(condition)
Definition: c.h:804
static bool do_wait
Definition: pg_ctl.c:79

Variable Documentation

◆ shutdown_info

ShutdownInformation shutdown_info
static

Definition at line 154 of file parallel.c.

◆ signal_info

volatile DumpSignalInformation signal_info
static

Definition at line 175 of file parallel.c.