PostgreSQL Source Code  git master
parallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *
5  * Parallel support for pg_dump and pg_restore
6  *
7  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  * src/bin/pg_dump/parallel.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 /*
17  * Parallel operation works like this:
18  *
19  * The original, leader process calls ParallelBackupStart(), which forks off
20  * the desired number of worker processes, which each enter WaitForCommands().
21  *
22  * The leader process dispatches an individual work item to one of the worker
23  * processes in DispatchJobForTocEntry(). We send a command string such as
24  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25  * The worker process receives and decodes the command and passes it to the
26  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27  * which are routines of the current archive format. That routine performs
28  * the required action (dump or restore) and returns an integer status code.
29  * This is passed back to the leader where we pass it to the
30  * ParallelCompletionPtr callback function that was passed to
31  * DispatchJobForTocEntry(). The callback function does state updating
32  * for the leader control logic in pg_backup_archiver.c.
33  *
34  * In principle additional archive-format-specific information might be needed
35  * in commands or worker status responses, but so far that hasn't proved
36  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37  * data structures. Remember that we have forked off the workers only after
38  * we have read in the catalog. That's why our worker processes can also
39  * access the catalog information. (In the Windows case, the workers are
40  * threads in the same process. To avoid problems, they work with cloned
41  * copies of the Archive data structure; see RunWorker().)
42  *
43  * In the leader process, the workerStatus field for each worker has one of
44  * the following values:
45  * WRKR_NOT_STARTED: we've not yet forked this worker
46  * WRKR_IDLE: it's waiting for a command
47  * WRKR_WORKING: it's working on a command
48  * WRKR_TERMINATED: process ended
49  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50  * state, and must be NULL in other states.
51  */
52 
53 #include "postgres_fe.h"
54 
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64 
65 #include "fe_utils/string_utils.h"
66 #include "parallel.h"
67 #include "pg_backup_utils.h"
68 #include "port/pg_bswap.h"
69 
70 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 #define PIPE_READ 0
72 #define PIPE_WRITE 1
73 
74 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75 
76 /* Worker process statuses */
77 typedef enum
78 {
84 
85 #define WORKER_IS_RUNNING(workerStatus) \
86  ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 
88 /*
89  * Private per-parallel-worker state (typedef for this is in parallel.h).
90  *
91  * Much of this is valid only in the leader process (or, on Windows, should
92  * be touched only by the leader thread). But the AH field should be touched
93  * only by workers. The pipe descriptors are valid everywhere.
94  */
96 {
97  T_WorkerStatus workerStatus; /* see enum above */
98 
99  /* These fields are valid if workerStatus == WRKR_WORKING: */
100  ParallelCompletionPtr callback; /* function to call on completion */
101  void *callback_data; /* passthrough data for it */
102 
103  ArchiveHandle *AH; /* Archive data worker is using */
104 
105  int pipeRead; /* leader's end of the pipes */
107  int pipeRevRead; /* child's end of the pipes */
109 
110  /* Child process/thread identity info: */
111 #ifdef WIN32
112  uintptr_t hThread;
113  unsigned int threadId;
114 #else
115  pid_t pid;
116 #endif
117 };
118 
119 #ifdef WIN32
120 
121 /*
122  * Structure to hold info passed by _beginthreadex() to the function it calls
123  * via its single allowed argument.
124  */
125 typedef struct
126 {
127  ArchiveHandle *AH; /* leader database connection */
128  ParallelSlot *slot; /* this worker's parallel slot */
129 } WorkerInfo;
130 
131 /* Windows implementation of pipe access */
132 static int pgpipe(int handles[2]);
133 #define piperead(a,b,c) recv(a,b,c,0)
134 #define pipewrite(a,b,c) send(a,b,c,0)
135 
136 #else /* !WIN32 */
137 
138 /* Non-Windows implementation of pipe access */
139 #define pgpipe(a) pipe(a)
140 #define piperead(a,b,c) read(a,b,c)
141 #define pipewrite(a,b,c) write(a,b,c)
142 
143 #endif /* WIN32 */
144 
145 /*
146  * State info for archive_close_connection() shutdown callback.
147  */
148 typedef struct ShutdownInformation
149 {
153 
155 
156 /*
157  * State info for signal handling.
158  * We assume signal_info initializes to zeroes.
159  *
160  * On Unix, myAH is the leader DB connection in the leader process, and the
161  * worker's own connection in worker processes. On Windows, we have only one
162  * instance of signal_info, so myAH is the leader connection and the worker
163  * connections must be dug out of pstate->parallelSlot[].
164  */
165 typedef struct DumpSignalInformation
166 {
167  ArchiveHandle *myAH; /* database connection to issue cancel for */
168  ParallelState *pstate; /* parallel state, if any */
169  bool handler_set; /* signal handler set up in this process? */
170 #ifndef WIN32
171  bool am_worker; /* am I a worker process? */
172 #endif
174 
176 
177 #ifdef WIN32
178 static CRITICAL_SECTION signal_info_lock;
179 #endif
180 
181 /*
182  * Write a simple string to stderr --- must be safe in a signal handler.
183  * We ignore the write() result since there's not much we could do about it.
184  * Certain compilers make that harder than it ought to be.
185  */
186 #define write_stderr(str) \
187  do { \
188  const char *str_ = (str); \
189  int rc_; \
190  rc_ = write(fileno(stderr), str_, strlen(str_)); \
191  (void) rc_; \
192  } while (0)
193 
194 
195 #ifdef WIN32
196 /* file-scope variables */
197 static DWORD tls_index;
198 
199 /* globally visible variables (needed by exit_nicely) */
200 bool parallel_init_done = false;
201 DWORD mainThreadId;
202 #endif /* WIN32 */
203 
204 /* Local function prototypes */
205 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 static void archive_close_connection(int code, void *arg);
207 static void ShutdownWorkersHard(ParallelState *pstate);
208 static void WaitForTerminatingWorkers(ParallelState *pstate);
209 static void setup_cancel_handler(void);
210 static void set_cancel_pstate(ParallelState *pstate);
212 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 static int GetIdleWorker(ParallelState *pstate);
214 static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218  bool do_wait);
219 static char *getMessageFromLeader(int pipefd[2]);
220 static void sendMessageToLeader(int pipefd[2], const char *str);
221 static int select_loop(int maxFd, fd_set *workerset);
222 static char *getMessageFromWorker(ParallelState *pstate,
223  bool do_wait, int *worker);
224 static void sendMessageToWorker(ParallelState *pstate,
225  int worker, const char *str);
226 static char *readMessageFromPipe(int fd);
227 
228 #define messageStartsWith(msg, prefix) \
229  (strncmp(msg, prefix, strlen(prefix)) == 0)
230 
231 
232 /*
233  * Initialize parallel dump support --- should be called early in process
234  * startup. (Currently, this is called whether or not we intend parallel
235  * activity.)
236  */
237 void
239 {
240 #ifdef WIN32
241  if (!parallel_init_done)
242  {
243  WSADATA wsaData;
244  int err;
245 
246  /* Prepare for threaded operation */
247  tls_index = TlsAlloc();
248  mainThreadId = GetCurrentThreadId();
249 
250  /* Initialize socket access */
251  err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252  if (err != 0)
253  {
254  pg_log_error("WSAStartup failed: %d", err);
255  exit_nicely(1);
256  }
257 
258  parallel_init_done = true;
259  }
260 #endif
261 }
262 
263 /*
264  * Find the ParallelSlot for the current worker process or thread.
265  *
266  * Returns NULL if no matching slot is found (this implies we're the leader).
267  */
268 static ParallelSlot *
270 {
271  int i;
272 
273  for (i = 0; i < pstate->numWorkers; i++)
274  {
275 #ifdef WIN32
276  if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
277 #else
278  if (pstate->parallelSlot[i].pid == getpid())
279 #endif
280  return &(pstate->parallelSlot[i]);
281  }
282 
283  return NULL;
284 }
285 
286 /*
287  * A thread-local version of getLocalPQExpBuffer().
288  *
289  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
290  * thread, which is much better than one per fmtId/fmtQualifiedId call.
291  */
292 #ifdef WIN32
293 static PQExpBuffer
294 getThreadLocalPQExpBuffer(void)
295 {
296  /*
297  * The Tls code goes awry if we use a static var, so we provide for both
298  * static and auto, and omit any use of the static var when using Tls. We
299  * rely on TlsGetValue() to return 0 if the value is not yet set.
300  */
301  static PQExpBuffer s_id_return = NULL;
302  PQExpBuffer id_return;
303 
304  if (parallel_init_done)
305  id_return = (PQExpBuffer) TlsGetValue(tls_index);
306  else
307  id_return = s_id_return;
308 
309  if (id_return) /* first time through? */
310  {
311  /* same buffer, just wipe contents */
312  resetPQExpBuffer(id_return);
313  }
314  else
315  {
316  /* new buffer */
317  id_return = createPQExpBuffer();
318  if (parallel_init_done)
319  TlsSetValue(tls_index, id_return);
320  else
321  s_id_return = id_return;
322  }
323 
324  return id_return;
325 }
326 #endif /* WIN32 */
327 
328 /*
329  * pg_dump and pg_restore call this to register the cleanup handler
330  * as soon as they've created the ArchiveHandle.
331  */
332 void
334 {
335  shutdown_info.AHX = AHX;
336  on_exit_nicely(archive_close_connection, &shutdown_info);
337 }
338 
339 /*
340  * on_exit_nicely handler for shutting down database connections and
341  * worker processes cleanly.
342  */
343 static void
345 {
347 
348  if (si->pstate)
349  {
350  /* In parallel mode, must figure out who we are */
351  ParallelSlot *slot = GetMyPSlot(si->pstate);
352 
353  if (!slot)
354  {
355  /*
356  * We're the leader. Forcibly shut down workers, then close our
357  * own database connection, if any.
358  */
360 
361  if (si->AHX)
362  DisconnectDatabase(si->AHX);
363  }
364  else
365  {
366  /*
367  * We're a worker. Shut down our own DB connection if any. On
368  * Windows, we also have to close our communication sockets, to
369  * emulate what will happen on Unix when the worker process exits.
370  * (Without this, if this is a premature exit, the leader would
371  * fail to detect it because there would be no EOF condition on
372  * the other end of the pipe.)
373  */
374  if (slot->AH)
375  DisconnectDatabase(&(slot->AH->public));
376 
377 #ifdef WIN32
378  closesocket(slot->pipeRevRead);
379  closesocket(slot->pipeRevWrite);
380 #endif
381  }
382  }
383  else
384  {
385  /* Non-parallel operation: just kill the leader DB connection */
386  if (si->AHX)
387  DisconnectDatabase(si->AHX);
388  }
389 }
390 
391 /*
392  * Forcibly shut down any remaining workers, waiting for them to finish.
393  *
394  * Note that we don't expect to come here during normal exit (the workers
395  * should be long gone, and the ParallelState too). We're only here in a
396  * fatal() situation, so intervening to cancel active commands is
397  * appropriate.
398  */
399 static void
401 {
402  int i;
403 
404  /*
405  * Close our write end of the sockets so that any workers waiting for
406  * commands know they can exit. (Note: some of the pipeWrite fields might
407  * still be zero, if we failed to initialize all the workers. Hence, just
408  * ignore errors here.)
409  */
410  for (i = 0; i < pstate->numWorkers; i++)
411  closesocket(pstate->parallelSlot[i].pipeWrite);
412 
413  /*
414  * Force early termination of any commands currently in progress.
415  */
416 #ifndef WIN32
417  /* On non-Windows, send SIGTERM to each worker process. */
418  for (i = 0; i < pstate->numWorkers; i++)
419  {
420  pid_t pid = pstate->parallelSlot[i].pid;
421 
422  if (pid != 0)
423  kill(pid, SIGTERM);
424  }
425 #else
426 
427  /*
428  * On Windows, send query cancels directly to the workers' backends. Use
429  * a critical section to ensure worker threads don't change state.
430  */
431  EnterCriticalSection(&signal_info_lock);
432  for (i = 0; i < pstate->numWorkers; i++)
433  {
434  ArchiveHandle *AH = pstate->parallelSlot[i].AH;
435  char errbuf[1];
436 
437  if (AH != NULL && AH->connCancel != NULL)
438  (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
439  }
440  LeaveCriticalSection(&signal_info_lock);
441 #endif
442 
443  /* Now wait for them to terminate. */
445 }
446 
447 /*
448  * Wait for all workers to terminate.
449  */
450 static void
452 {
453  while (!HasEveryWorkerTerminated(pstate))
454  {
455  ParallelSlot *slot = NULL;
456  int j;
457 
458 #ifndef WIN32
459  /* On non-Windows, use wait() to wait for next worker to end */
460  int status;
461  pid_t pid = wait(&status);
462 
463  /* Find dead worker's slot, and clear the PID field */
464  for (j = 0; j < pstate->numWorkers; j++)
465  {
466  slot = &(pstate->parallelSlot[j]);
467  if (slot->pid == pid)
468  {
469  slot->pid = 0;
470  break;
471  }
472  }
473 #else /* WIN32 */
474  /* On Windows, we must use WaitForMultipleObjects() */
475  HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
476  int nrun = 0;
477  DWORD ret;
478  uintptr_t hThread;
479 
480  for (j = 0; j < pstate->numWorkers; j++)
481  {
483  {
484  lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
485  nrun++;
486  }
487  }
488  ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
489  Assert(ret != WAIT_FAILED);
490  hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
491  free(lpHandles);
492 
493  /* Find dead worker's slot, and clear the hThread field */
494  for (j = 0; j < pstate->numWorkers; j++)
495  {
496  slot = &(pstate->parallelSlot[j]);
497  if (slot->hThread == hThread)
498  {
499  /* For cleanliness, close handles for dead threads */
500  CloseHandle((HANDLE) slot->hThread);
501  slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
502  break;
503  }
504  }
505 #endif /* WIN32 */
506 
507  /* On all platforms, update workerStatus and te[] as well */
508  Assert(j < pstate->numWorkers);
510  pstate->te[j] = NULL;
511  }
512 }
513 
514 
515 /*
516  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
517  *
518  * This doesn't quite belong in this module, but it needs access to the
519  * ParallelState data, so there's not really a better place either.
520  *
521  * When we get a cancel interrupt, we could just die, but in pg_restore that
522  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
523  * for a long time. Instead, we try to send a cancel request and then die.
524  * pg_dump probably doesn't really need this, but we might as well use it
525  * there too. Note that sending the cancel directly from the signal handler
526  * is safe because PQcancel() is written to make it so.
527  *
528  * In parallel operation on Unix, each process is responsible for canceling
529  * its own connection (this must be so because nobody else has access to it).
530  * Furthermore, the leader process should attempt to forward its signal to
531  * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
532  * needed because typing control-C at the console would deliver SIGINT to
533  * every member of the terminal process group --- but in other scenarios it
534  * might be that only the leader gets signaled.
535  *
536  * On Windows, the cancel handler runs in a separate thread, because that's
537  * how SetConsoleCtrlHandler works. We make it stop worker threads, send
538  * cancels on all active connections, and then return FALSE, which will allow
539  * the process to die. For safety's sake, we use a critical section to
540  * protect the PGcancel structures against being changed while the signal
541  * thread runs.
542  */
543 
544 #ifndef WIN32
545 
546 /*
547  * Signal handler (Unix only)
548  */
549 static void
551 {
552  int i;
553  char errbuf[1];
554 
555  /*
556  * Some platforms allow delivery of new signals to interrupt an active
557  * signal handler. That could muck up our attempt to send PQcancel, so
558  * disable the signals that setup_cancel_handler enabled.
559  */
560  pqsignal(SIGINT, SIG_IGN);
561  pqsignal(SIGTERM, SIG_IGN);
563 
564  /*
565  * If we're in the leader, forward signal to all workers. (It seems best
566  * to do this before PQcancel; killing the leader transaction will result
567  * in invalid-snapshot errors from active workers, which maybe we can
568  * quiet by killing workers first.) Ignore any errors.
569  */
570  if (signal_info.pstate != NULL)
571  {
572  for (i = 0; i < signal_info.pstate->numWorkers; i++)
573  {
574  pid_t pid = signal_info.pstate->parallelSlot[i].pid;
575 
576  if (pid != 0)
577  kill(pid, SIGTERM);
578  }
579  }
580 
581  /*
582  * Send QueryCancel if we have a connection to send to. Ignore errors,
583  * there's not much we can do about them anyway.
584  */
585  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
586  (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
587 
588  /*
589  * Report we're quitting, using nothing more complicated than write(2).
590  * When in parallel operation, only the leader process should do this.
591  */
592  if (!signal_info.am_worker)
593  {
594  if (progname)
595  {
597  write_stderr(": ");
598  }
599  write_stderr("terminated by user\n");
600  }
601 
602  /*
603  * And die, using _exit() not exit() because the latter will invoke atexit
604  * handlers that can fail if we interrupted related code.
605  */
606  _exit(1);
607 }
608 
609 /*
610  * Enable cancel interrupt handler, if not already done.
611  */
612 static void
614 {
615  /*
616  * When forking, signal_info.handler_set will propagate into the new
617  * process, but that's fine because the signal handler state does too.
618  */
619  if (!signal_info.handler_set)
620  {
621  signal_info.handler_set = true;
622 
623  pqsignal(SIGINT, sigTermHandler);
624  pqsignal(SIGTERM, sigTermHandler);
626  }
627 }
628 
629 #else /* WIN32 */
630 
631 /*
632  * Console interrupt handler --- runs in a newly-started thread.
633  *
634  * After stopping other threads and sending cancel requests on all open
635  * connections, we return FALSE which will allow the default ExitProcess()
636  * action to be taken.
637  */
638 static BOOL WINAPI
639 consoleHandler(DWORD dwCtrlType)
640 {
641  int i;
642  char errbuf[1];
643 
644  if (dwCtrlType == CTRL_C_EVENT ||
645  dwCtrlType == CTRL_BREAK_EVENT)
646  {
647  /* Critical section prevents changing data we look at here */
648  EnterCriticalSection(&signal_info_lock);
649 
650  /*
651  * If in parallel mode, stop worker threads and send QueryCancel to
652  * their connected backends. The main point of stopping the worker
653  * threads is to keep them from reporting the query cancels as errors,
654  * which would clutter the user's screen. We needn't stop the leader
655  * thread since it won't be doing much anyway. Do this before
656  * canceling the main transaction, else we might get invalid-snapshot
657  * errors reported before we can stop the workers. Ignore errors,
658  * there's not much we can do about them anyway.
659  */
660  if (signal_info.pstate != NULL)
661  {
662  for (i = 0; i < signal_info.pstate->numWorkers; i++)
663  {
664  ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
665  ArchiveHandle *AH = slot->AH;
666  HANDLE hThread = (HANDLE) slot->hThread;
667 
668  /*
669  * Using TerminateThread here may leave some resources leaked,
670  * but it doesn't matter since we're about to end the whole
671  * process.
672  */
673  if (hThread != INVALID_HANDLE_VALUE)
674  TerminateThread(hThread, 0);
675 
676  if (AH != NULL && AH->connCancel != NULL)
677  (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
678  }
679  }
680 
681  /*
682  * Send QueryCancel to leader connection, if enabled. Ignore errors,
683  * there's not much we can do about them anyway.
684  */
685  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
686  (void) PQcancel(signal_info.myAH->connCancel,
687  errbuf, sizeof(errbuf));
688 
689  LeaveCriticalSection(&signal_info_lock);
690 
691  /*
692  * Report we're quitting, using nothing more complicated than
693  * write(2). (We might be able to get away with using pg_log_*()
694  * here, but since we terminated other threads uncleanly above, it
695  * seems better to assume as little as possible.)
696  */
697  if (progname)
698  {
700  write_stderr(": ");
701  }
702  write_stderr("terminated by user\n");
703  }
704 
705  /* Always return FALSE to allow signal handling to continue */
706  return FALSE;
707 }
708 
709 /*
710  * Enable cancel interrupt handler, if not already done.
711  */
712 static void
714 {
715  if (!signal_info.handler_set)
716  {
717  signal_info.handler_set = true;
718 
719  InitializeCriticalSection(&signal_info_lock);
720 
721  SetConsoleCtrlHandler(consoleHandler, TRUE);
722  }
723 }
724 
725 #endif /* WIN32 */
726 
727 
728 /*
729  * set_archive_cancel_info
730  *
731  * Fill AH->connCancel with cancellation info for the specified database
732  * connection; or clear it if conn is NULL.
733  */
734 void
736 {
737  PGcancel *oldConnCancel;
738 
739  /*
740  * Activate the interrupt handler if we didn't yet in this process. On
741  * Windows, this also initializes signal_info_lock; therefore it's
742  * important that this happen at least once before we fork off any
743  * threads.
744  */
746 
747  /*
748  * On Unix, we assume that storing a pointer value is atomic with respect
749  * to any possible signal interrupt. On Windows, use a critical section.
750  */
751 
752 #ifdef WIN32
753  EnterCriticalSection(&signal_info_lock);
754 #endif
755 
756  /* Free the old one if we have one */
757  oldConnCancel = AH->connCancel;
758  /* be sure interrupt handler doesn't use pointer while freeing */
759  AH->connCancel = NULL;
760 
761  if (oldConnCancel != NULL)
762  PQfreeCancel(oldConnCancel);
763 
764  /* Set the new one if specified */
765  if (conn)
766  AH->connCancel = PQgetCancel(conn);
767 
768  /*
769  * On Unix, there's only ever one active ArchiveHandle per process, so we
770  * can just set signal_info.myAH unconditionally. On Windows, do that
771  * only in the main thread; worker threads have to make sure their
772  * ArchiveHandle appears in the pstate data, which is dealt with in
773  * RunWorker().
774  */
775 #ifndef WIN32
776  signal_info.myAH = AH;
777 #else
778  if (mainThreadId == GetCurrentThreadId())
779  signal_info.myAH = AH;
780 #endif
781 
782 #ifdef WIN32
783  LeaveCriticalSection(&signal_info_lock);
784 #endif
785 }
786 
787 /*
788  * set_cancel_pstate
789  *
790  * Set signal_info.pstate to point to the specified ParallelState, if any.
791  * We need this mainly to have an interlock against Windows signal thread.
792  */
793 static void
795 {
796 #ifdef WIN32
797  EnterCriticalSection(&signal_info_lock);
798 #endif
799 
800  signal_info.pstate = pstate;
801 
802 #ifdef WIN32
803  LeaveCriticalSection(&signal_info_lock);
804 #endif
805 }
806 
807 /*
808  * set_cancel_slot_archive
809  *
810  * Set ParallelSlot's AH field to point to the specified archive, if any.
811  * We need this mainly to have an interlock against Windows signal thread.
812  */
813 static void
815 {
816 #ifdef WIN32
817  EnterCriticalSection(&signal_info_lock);
818 #endif
819 
820  slot->AH = AH;
821 
822 #ifdef WIN32
823  LeaveCriticalSection(&signal_info_lock);
824 #endif
825 }
826 
827 
828 /*
829  * This function is called by both Unix and Windows variants to set up
830  * and run a worker process. Caller should exit the process (or thread)
831  * upon return.
832  */
833 static void
835 {
836  int pipefd[2];
837 
838  /* fetch child ends of pipes */
839  pipefd[PIPE_READ] = slot->pipeRevRead;
840  pipefd[PIPE_WRITE] = slot->pipeRevWrite;
841 
842  /*
843  * Clone the archive so that we have our own state to work with, and in
844  * particular our own database connection.
845  *
846  * We clone on Unix as well as Windows, even though technically we don't
847  * need to because fork() gives us a copy in our own address space
848  * already. But CloneArchive resets the state information and also clones
849  * the database connection which both seem kinda helpful.
850  */
851  AH = CloneArchive(AH);
852 
853  /* Remember cloned archive where signal handler can find it */
854  set_cancel_slot_archive(slot, AH);
855 
856  /*
857  * Call the setup worker function that's defined in the ArchiveHandle.
858  */
859  (AH->SetupWorkerPtr) ((Archive *) AH);
860 
861  /*
862  * Execute commands until done.
863  */
864  WaitForCommands(AH, pipefd);
865 
866  /*
867  * Disconnect from database and clean up.
868  */
869  set_cancel_slot_archive(slot, NULL);
870  DisconnectDatabase(&(AH->public));
871  DeCloneArchive(AH);
872 }
873 
874 /*
875  * Thread base function for Windows
876  */
877 #ifdef WIN32
878 static unsigned __stdcall
879 init_spawned_worker_win32(WorkerInfo *wi)
880 {
881  ArchiveHandle *AH = wi->AH;
882  ParallelSlot *slot = wi->slot;
883 
884  /* Don't need WorkerInfo anymore */
885  free(wi);
886 
887  /* Run the worker ... */
888  RunWorker(AH, slot);
889 
890  /* Exit the thread */
891  _endthreadex(0);
892  return 0;
893 }
894 #endif /* WIN32 */
895 
896 /*
897  * This function starts a parallel dump or restore by spawning off the worker
898  * processes. For Windows, it creates a number of threads; on Unix the
899  * workers are created with fork().
900  */
903 {
904  ParallelState *pstate;
905  int i;
906 
907  Assert(AH->public.numWorkers > 0);
908 
909  pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
910 
911  pstate->numWorkers = AH->public.numWorkers;
912  pstate->te = NULL;
913  pstate->parallelSlot = NULL;
914 
915  if (AH->public.numWorkers == 1)
916  return pstate;
917 
918  /* Create status arrays, being sure to initialize all fields to 0 */
919  pstate->te = (TocEntry **)
920  pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
921  pstate->parallelSlot = (ParallelSlot *)
922  pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
923 
924 #ifdef WIN32
925  /* Make fmtId() and fmtQualifiedId() use thread-local storage */
926  getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
927 #endif
928 
929  /*
930  * Set the pstate in shutdown_info, to tell the exit handler that it must
931  * clean up workers as well as the main database connection. But we don't
932  * set this in signal_info yet, because we don't want child processes to
933  * inherit non-NULL signal_info.pstate.
934  */
935  shutdown_info.pstate = pstate;
936 
937  /*
938  * Temporarily disable query cancellation on the leader connection. This
939  * ensures that child processes won't inherit valid AH->connCancel
940  * settings and thus won't try to issue cancels against the leader's
941  * connection. No harm is done if we fail while it's disabled, because
942  * the leader connection is idle at this point anyway.
943  */
944  set_archive_cancel_info(AH, NULL);
945 
946  /* Ensure stdio state is quiesced before forking */
947  fflush(NULL);
948 
949  /* Create desired number of workers */
950  for (i = 0; i < pstate->numWorkers; i++)
951  {
952 #ifdef WIN32
953  WorkerInfo *wi;
954  uintptr_t handle;
955 #else
956  pid_t pid;
957 #endif
958  ParallelSlot *slot = &(pstate->parallelSlot[i]);
959  int pipeMW[2],
960  pipeWM[2];
961 
962  /* Create communication pipes for this worker */
963  if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
964  fatal("could not create communication channels: %m");
965 
966  /* leader's ends of the pipes */
967  slot->pipeRead = pipeWM[PIPE_READ];
968  slot->pipeWrite = pipeMW[PIPE_WRITE];
969  /* child's ends of the pipes */
970  slot->pipeRevRead = pipeMW[PIPE_READ];
971  slot->pipeRevWrite = pipeWM[PIPE_WRITE];
972 
973 #ifdef WIN32
974  /* Create transient structure to pass args to worker function */
975  wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
976 
977  wi->AH = AH;
978  wi->slot = slot;
979 
980  handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
981  wi, 0, &(slot->threadId));
982  slot->hThread = handle;
983  slot->workerStatus = WRKR_IDLE;
984 #else /* !WIN32 */
985  pid = fork();
986  if (pid == 0)
987  {
988  /* we are the worker */
989  int j;
990 
991  /* this is needed for GetMyPSlot() */
992  slot->pid = getpid();
993 
994  /* instruct signal handler that we're in a worker now */
995  signal_info.am_worker = true;
996 
997  /* close read end of Worker -> Leader */
998  closesocket(pipeWM[PIPE_READ]);
999  /* close write end of Leader -> Worker */
1000  closesocket(pipeMW[PIPE_WRITE]);
1001 
1002  /*
1003  * Close all inherited fds for communication of the leader with
1004  * previously-forked workers.
1005  */
1006  for (j = 0; j < i; j++)
1007  {
1008  closesocket(pstate->parallelSlot[j].pipeRead);
1009  closesocket(pstate->parallelSlot[j].pipeWrite);
1010  }
1011 
1012  /* Run the worker ... */
1013  RunWorker(AH, slot);
1014 
1015  /* We can just exit(0) when done */
1016  exit(0);
1017  }
1018  else if (pid < 0)
1019  {
1020  /* fork failed */
1021  fatal("could not create worker process: %m");
1022  }
1023 
1024  /* In Leader after successful fork */
1025  slot->pid = pid;
1026  slot->workerStatus = WRKR_IDLE;
1027 
1028  /* close read end of Leader -> Worker */
1029  closesocket(pipeMW[PIPE_READ]);
1030  /* close write end of Worker -> Leader */
1031  closesocket(pipeWM[PIPE_WRITE]);
1032 #endif /* WIN32 */
1033  }
1034 
1035  /*
1036  * Having forked off the workers, disable SIGPIPE so that leader isn't
1037  * killed if it tries to send a command to a dead worker. We don't want
1038  * the workers to inherit this setting, though.
1039  */
1040 #ifndef WIN32
1042 #endif
1043 
1044  /*
1045  * Re-establish query cancellation on the leader connection.
1046  */
1048 
1049  /*
1050  * Tell the cancel signal handler to forward signals to worker processes,
1051  * too. (As with query cancel, we did not need this earlier because the
1052  * workers have not yet been given anything to do; if we die before this
1053  * point, any already-started workers will see EOF and quit promptly.)
1054  */
1055  set_cancel_pstate(pstate);
1056 
1057  return pstate;
1058 }
1059 
1060 /*
1061  * Close down a parallel dump or restore.
1062  */
1063 void
1065 {
1066  int i;
1067 
1068  /* No work if non-parallel */
1069  if (pstate->numWorkers == 1)
1070  return;
1071 
1072  /* There should not be any unfinished jobs */
1073  Assert(IsEveryWorkerIdle(pstate));
1074 
1075  /* Close the sockets so that the workers know they can exit */
1076  for (i = 0; i < pstate->numWorkers; i++)
1077  {
1078  closesocket(pstate->parallelSlot[i].pipeRead);
1079  closesocket(pstate->parallelSlot[i].pipeWrite);
1080  }
1081 
1082  /* Wait for them to exit */
1083  WaitForTerminatingWorkers(pstate);
1084 
1085  /*
1086  * Unlink pstate from shutdown_info, so the exit handler will not try to
1087  * use it; and likewise unlink from signal_info.
1088  */
1089  shutdown_info.pstate = NULL;
1090  set_cancel_pstate(NULL);
1091 
1092  /* Release state (mere neatnik-ism, since we're about to terminate) */
1093  free(pstate->te);
1094  free(pstate->parallelSlot);
1095  free(pstate);
1096 }
1097 
1098 /*
1099  * These next four functions handle construction and parsing of the command
1100  * strings and response strings for parallel workers.
1101  *
1102  * Currently, these can be the same regardless of which archive format we are
1103  * processing. In future, we might want to let format modules override these
1104  * functions to add format-specific data to a command or response.
1105  */
1106 
1107 /*
1108  * buildWorkerCommand: format a command string to send to a worker.
1109  *
1110  * The string is built in the caller-supplied buffer of size buflen.
1111  */
1112 static void
1114  char *buf, int buflen)
1115 {
1116  if (act == ACT_DUMP)
1117  snprintf(buf, buflen, "DUMP %d", te->dumpId);
1118  else if (act == ACT_RESTORE)
1119  snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1120  else
1121  Assert(false);
1122 }
1123 
1124 /*
1125  * parseWorkerCommand: interpret a command string in a worker.
1126  */
1127 static void
1129  const char *msg)
1130 {
1131  DumpId dumpId;
1132  int nBytes;
1133 
1134  if (messageStartsWith(msg, "DUMP "))
1135  {
1136  *act = ACT_DUMP;
1137  sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1138  Assert(nBytes == strlen(msg));
1139  *te = getTocEntryByDumpId(AH, dumpId);
1140  Assert(*te != NULL);
1141  }
1142  else if (messageStartsWith(msg, "RESTORE "))
1143  {
1144  *act = ACT_RESTORE;
1145  sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1146  Assert(nBytes == strlen(msg));
1147  *te = getTocEntryByDumpId(AH, dumpId);
1148  Assert(*te != NULL);
1149  }
1150  else
1151  fatal("unrecognized command received from leader: \"%s\"",
1152  msg);
1153 }
1154 
1155 /*
1156  * buildWorkerResponse: format a response string to send to the leader.
1157  *
1158  * The string is built in the caller-supplied buffer of size buflen.
1159  */
1160 static void
1162  char *buf, int buflen)
1163 {
1164  snprintf(buf, buflen, "OK %d %d %d",
1165  te->dumpId,
1166  status,
1167  status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1168 }
1169 
1170 /*
1171  * parseWorkerResponse: parse the status message returned by a worker.
1172  *
1173  * Returns the integer status code, and may update fields of AH and/or te.
1174  */
1175 static int
1177  const char *msg)
1178 {
1179  DumpId dumpId;
1180  int nBytes,
1181  n_errors;
1182  int status = 0;
1183 
1184  if (messageStartsWith(msg, "OK "))
1185  {
1186  sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1187 
1188  Assert(dumpId == te->dumpId);
1189  Assert(nBytes == strlen(msg));
1190 
1191  AH->public.n_errors += n_errors;
1192  }
1193  else
1194  fatal("invalid message received from worker: \"%s\"",
1195  msg);
1196 
1197  return status;
1198 }
1199 
1200 /*
1201  * Dispatch a job to some free worker.
1202  *
1203  * te is the TocEntry to be processed, act is the action to be taken on it.
1204  * callback is the function to call on completion of the job.
1205  *
1206  * If no worker is currently available, this will block, and previously
1207  * registered callback functions may be called.
1208  */
1209 void
1211  ParallelState *pstate,
1212  TocEntry *te,
1213  T_Action act,
1215  void *callback_data)
1216 {
1217  int worker;
1218  char buf[256];
1219 
1220  /* Get a worker, waiting if none are idle */
1221  while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1222  WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1223 
1224  /* Construct and send command string */
1225  buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1226 
1227  sendMessageToWorker(pstate, worker, buf);
1228 
1229  /* Remember worker is busy, and which TocEntry it's working on */
1230  pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1231  pstate->parallelSlot[worker].callback = callback;
1232  pstate->parallelSlot[worker].callback_data = callback_data;
1233  pstate->te[worker] = te;
1234 }
1235 
1236 /*
1237  * Find an idle worker and return its slot number.
1238  * Return NO_SLOT if none are idle.
1239  */
1240 static int
1242 {
1243  int i;
1244 
1245  for (i = 0; i < pstate->numWorkers; i++)
1246  {
1247  if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1248  return i;
1249  }
1250  return NO_SLOT;
1251 }
1252 
1253 /*
1254  * Return true iff no worker is running.
1255  */
1256 static bool
1258 {
1259  int i;
1260 
1261  for (i = 0; i < pstate->numWorkers; i++)
1262  {
1264  return false;
1265  }
1266  return true;
1267 }
1268 
1269 /*
1270  * Return true iff every worker is in the WRKR_IDLE state.
1271  */
1272 bool
1274 {
1275  int i;
1276 
1277  for (i = 0; i < pstate->numWorkers; i++)
1278  {
1279  if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1280  return false;
1281  }
1282  return true;
1283 }
1284 
1285 /*
1286  * Acquire lock on a table to be dumped by a worker process.
1287  *
1288  * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1289  * it's no problem for a worker to get one too, but if anything else besides
1290  * pg_dump is running, there's a possible deadlock:
1291  *
1292  * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1293  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1294  * because the leader holds a conflicting ACCESS SHARE lock).
1295  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1296  * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1297  * 4) Now we have a deadlock, since the leader is effectively waiting for
1298  * the worker. The server cannot detect that, however.
1299  *
1300  * To prevent an infinite wait, prior to touching a table in a worker, request
1301  * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1302  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1303  * so we have a deadlock. We must fail the backup in that case.
1304  */
1305 static void
1307 {
1308  const char *qualId;
1309  PQExpBuffer query;
1310  PGresult *res;
1311 
1312  /* Nothing to do for BLOBS */
1313  if (strcmp(te->desc, "BLOBS") == 0)
1314  return;
1315 
1316  query = createPQExpBuffer();
1317 
1318  qualId = fmtQualifiedId(te->namespace, te->tag);
1319 
1320  appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1321  qualId);
1322 
1323  res = PQexec(AH->connection, query->data);
1324 
1325  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1326  fatal("could not obtain lock on relation \"%s\"\n"
1327  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1328  "on the table after the pg_dump parent process had gotten the "
1329  "initial ACCESS SHARE lock on the table.", qualId);
1330 
1331  PQclear(res);
1332  destroyPQExpBuffer(query);
1333 }
1334 
1335 /*
1336  * WaitForCommands: main routine for a worker process.
1337  *
1338  * Read and execute commands from the leader until we see EOF on the pipe.
1339  */
1340 static void
1342 {
1343  char *command;
1344  TocEntry *te;
1345  T_Action act;
1346  int status = 0;
1347  char buf[256];
1348 
1349  for (;;)
1350  {
1351  if (!(command = getMessageFromLeader(pipefd)))
1352  {
1353  /* EOF, so done */
1354  return;
1355  }
1356 
1357  /* Decode the command */
1358  parseWorkerCommand(AH, &te, &act, command);
1359 
1360  if (act == ACT_DUMP)
1361  {
1362  /* Acquire lock on this table within the worker's session */
1363  lockTableForWorker(AH, te);
1364 
1365  /* Perform the dump command */
1366  status = (AH->WorkerJobDumpPtr) (AH, te);
1367  }
1368  else if (act == ACT_RESTORE)
1369  {
1370  /* Perform the restore command */
1371  status = (AH->WorkerJobRestorePtr) (AH, te);
1372  }
1373  else
1374  Assert(false);
1375 
1376  /* Return status to leader */
1377  buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1378 
1379  sendMessageToLeader(pipefd, buf);
1380 
1381  /* command was pg_malloc'd and we are responsible for free()ing it. */
1382  free(command);
1383  }
1384 }
1385 
1386 /*
1387  * Check for status messages from workers.
1388  *
1389  * If do_wait is true, wait to get a status message; otherwise, just return
1390  * immediately if there is none available.
1391  *
1392  * When we get a status message, we pass the status code to the callback
1393  * function that was specified to DispatchJobForTocEntry, then reset the
1394  * worker status to IDLE.
1395  *
1396  * Returns true if we collected a status message, else false.
1397  *
1398  * XXX is it worth checking for more than one status message per call?
1399  * It seems somewhat unlikely that multiple workers would finish at exactly
1400  * the same time.
1401  */
1402 static bool
1404 {
1405  int worker;
1406  char *msg;
1407 
1408  /* Try to collect a status message */
1409  msg = getMessageFromWorker(pstate, do_wait, &worker);
1410 
1411  if (!msg)
1412  {
1413  /* If do_wait is true, we must have detected EOF on some socket */
1414  if (do_wait)
1415  fatal("a worker process died unexpectedly");
1416  return false;
1417  }
1418 
1419  /* Process it and update our idea of the worker's status */
1420  if (messageStartsWith(msg, "OK "))
1421  {
1422  ParallelSlot *slot = &pstate->parallelSlot[worker];
1423  TocEntry *te = pstate->te[worker];
1424  int status;
1425 
1426  status = parseWorkerResponse(AH, te, msg);
1427  slot->callback(AH, te, status, slot->callback_data);
1428  slot->workerStatus = WRKR_IDLE;
1429  pstate->te[worker] = NULL;
1430  }
1431  else
1432  fatal("invalid message received from worker: \"%s\"",
1433  msg);
1434 
1435  /* Free the string returned from getMessageFromWorker */
1436  free(msg);
1437 
1438  return true;
1439 }
1440 
1441 /*
1442  * Check for status results from workers, waiting if necessary.
1443  *
1444  * Available wait modes are:
1445  * WFW_NO_WAIT: reap any available status, but don't block
1446  * WFW_GOT_STATUS: wait for at least one more worker to finish
1447  * WFW_ONE_IDLE: wait for at least one worker to be idle
1448  * WFW_ALL_IDLE: wait for all workers to be idle
1449  *
1450  * Any received results are passed to the callback specified to
1451  * DispatchJobForTocEntry.
1452  *
1453  * This function is executed in the leader process.
1454  */
1455 void
1457 {
1458  bool do_wait = false;
1459 
1460  /*
1461  * In GOT_STATUS mode, always block waiting for a message, since we can't
1462  * return till we get something. In other modes, we don't block the first
1463  * time through the loop.
1464  */
1465  if (mode == WFW_GOT_STATUS)
1466  {
1467  /* Assert that caller knows what it's doing */
1468  Assert(!IsEveryWorkerIdle(pstate));
1469  do_wait = true;
1470  }
1471 
1472  for (;;)
1473  {
1474  /*
1475  * Check for status messages, even if we don't need to block. We do
1476  * not try very hard to reap all available messages, though, since
1477  * there's unlikely to be more than one.
1478  */
1479  if (ListenToWorkers(AH, pstate, do_wait))
1480  {
1481  /*
1482  * If we got a message, we are done by definition for GOT_STATUS
1483  * mode, and we can also be certain that there's at least one idle
1484  * worker. So we're done in all but ALL_IDLE mode.
1485  */
1486  if (mode != WFW_ALL_IDLE)
1487  return;
1488  }
1489 
1490  /* Check whether we must wait for new status messages */
1491  switch (mode)
1492  {
1493  case WFW_NO_WAIT:
1494  return; /* never wait */
1495  case WFW_GOT_STATUS:
1496  Assert(false); /* can't get here, because we waited */
1497  break;
1498  case WFW_ONE_IDLE:
1499  if (GetIdleWorker(pstate) != NO_SLOT)
1500  return;
1501  break;
1502  case WFW_ALL_IDLE:
1503  if (IsEveryWorkerIdle(pstate))
1504  return;
1505  break;
1506  }
1507 
1508  /* Loop back, and this time wait for something to happen */
1509  do_wait = true;
1510  }
1511 }
1512 
1513 /*
1514  * Read one command message from the leader, blocking if necessary
1515  * until one is available, and return it as a malloc'd string.
1516  * On EOF, return NULL.
1517  *
1518  * This function is executed in worker processes.
1519  */
1520 static char *
1521 getMessageFromLeader(int pipefd[2])
1522 {
1523  return readMessageFromPipe(pipefd[PIPE_READ]);
1524 }
1525 
1526 /*
1527  * Send a status message to the leader.
1528  *
1529  * This function is executed in worker processes.
1530  */
1531 static void
1532 sendMessageToLeader(int pipefd[2], const char *str)
1533 {
1534  int len = strlen(str) + 1;
1535 
1536  if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1537  fatal("could not write to the communication channel: %m");
1538 }
1539 
1540 /*
1541  * Wait until some descriptor in "workerset" becomes readable.
1542  * Returns -1 on error, else the number of readable descriptors.
1543  */
1544 static int
1545 select_loop(int maxFd, fd_set *workerset)
1546 {
1547  int i;
1548  fd_set saveSet = *workerset;
1549 
1550  for (;;)
1551  {
1552  *workerset = saveSet;
1553  i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1554 
1555 #ifndef WIN32
1556  if (i < 0 && errno == EINTR)
1557  continue;
1558 #else
1559  if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1560  continue;
1561 #endif
1562  break;
1563  }
1564 
1565  return i;
1566 }
1567 
1568 
1569 /*
1570  * Check for messages from worker processes.
1571  *
1572  * If a message is available, return it as a malloc'd string, and put the
1573  * index of the sending worker in *worker.
1574  *
1575  * If nothing is available, wait if "do_wait" is true, else return NULL.
1576  *
1577  * If we detect EOF on any socket, we'll return NULL. It's not great that
1578  * that's hard to distinguish from the no-data-available case, but for now
1579  * our one caller is okay with that.
1580  *
1581  * This function is executed in the leader process.
1582  */
1583 static char *
1584 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1585 {
1586  int i;
1587  fd_set workerset;
1588  int maxFd = -1;
1589  struct timeval nowait = {0, 0};
1590 
1591  /* construct bitmap of socket descriptors for select() */
1592  FD_ZERO(&workerset);
1593  for (i = 0; i < pstate->numWorkers; i++)
1594  {
1595  if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1596  continue;
1597  FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1598  if (pstate->parallelSlot[i].pipeRead > maxFd)
1599  maxFd = pstate->parallelSlot[i].pipeRead;
1600  }
1601 
1602  if (do_wait)
1603  {
1604  i = select_loop(maxFd, &workerset);
1605  Assert(i != 0);
1606  }
1607  else
1608  {
1609  if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1610  return NULL;
1611  }
1612 
1613  if (i < 0)
1614  fatal("select() failed: %m");
1615 
1616  for (i = 0; i < pstate->numWorkers; i++)
1617  {
1618  char *msg;
1619 
1620  if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1621  continue;
1622  if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1623  continue;
1624 
1625  /*
1626  * Read the message if any. If the socket is ready because of EOF,
1627  * we'll return NULL instead (and the socket will stay ready, so the
1628  * condition will persist).
1629  *
1630  * Note: because this is a blocking read, we'll wait if only part of
1631  * the message is available. Waiting a long time would be bad, but
1632  * since worker status messages are short and are always sent in one
1633  * operation, it shouldn't be a problem in practice.
1634  */
1635  msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1636  *worker = i;
1637  return msg;
1638  }
1639  Assert(false);
1640  return NULL;
1641 }
1642 
1643 /*
1644  * Send a command message to the specified worker process.
1645  *
1646  * This function is executed in the leader process.
1647  */
1648 static void
1649 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1650 {
1651  int len = strlen(str) + 1;
1652 
1653  if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1654  {
1655  fatal("could not write to the communication channel: %m");
1656  }
1657 }
1658 
1659 /*
1660  * Read one message from the specified pipe (fd), blocking if necessary
1661  * until one is available, and return it as a malloc'd string.
1662  * On EOF, return NULL.
1663  *
1664  * A "message" on the channel is just a null-terminated string.
1665  */
1666 static char *
1668 {
1669  char *msg;
1670  int msgsize,
1671  bufsize;
1672  int ret;
1673 
1674  /*
1675  * In theory, if we let piperead() read multiple bytes, it might give us
1676  * back fragments of multiple messages. (That can't actually occur, since
1677  * neither leader nor workers send more than one message without waiting
1678  * for a reply, but we don't wish to assume that here.) For simplicity,
1679  * read a byte at a time until we get the terminating '\0'. This method
1680  * is a bit inefficient, but since this is only used for relatively short
1681  * command and status strings, it shouldn't matter.
1682  */
1683  bufsize = 64; /* could be any number */
1684  msg = (char *) pg_malloc(bufsize);
1685  msgsize = 0;
1686  for (;;)
1687  {
1688  Assert(msgsize < bufsize);
1689  ret = piperead(fd, msg + msgsize, 1);
1690  if (ret <= 0)
1691  break; /* error or connection closure */
1692 
1693  Assert(ret == 1);
1694 
1695  if (msg[msgsize] == '\0')
1696  return msg; /* collected whole message */
1697 
1698  msgsize++;
1699  if (msgsize == bufsize) /* enlarge buffer if needed */
1700  {
1701  bufsize += 16; /* could be any number */
1702  msg = (char *) pg_realloc(msg, bufsize);
1703  }
1704  }
1705 
1706  /* Other end has closed the connection */
1707  pg_free(msg);
1708  return NULL;
1709 }
1710 
1711 #ifdef WIN32
1712 
1713 /*
1714  * This is a replacement version of pipe(2) for Windows which allows the pipe
1715  * handles to be used in select().
1716  *
1717  * Reads and writes on the pipe must go through piperead()/pipewrite().
1718  *
1719  * For consistency with Unix we declare the returned handles as "int".
1720  * This is okay even on WIN64 because system handles are not more than
1721  * 32 bits wide, but we do have to do some casting.
1722  */
1723 static int
1724 pgpipe(int handles[2])
1725 {
1726  pgsocket s,
1727  tmp_sock;
1728  struct sockaddr_in serv_addr;
1729  int len = sizeof(serv_addr);
1730 
1731  /* We have to use the Unix socket invalid file descriptor value here. */
1732  handles[0] = handles[1] = -1;
1733 
1734  /*
1735  * setup listen socket
1736  */
1737  if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1738  {
1739  pg_log_error("pgpipe: could not create socket: error code %d",
1740  WSAGetLastError());
1741  return -1;
1742  }
1743 
1744  memset((void *) &serv_addr, 0, sizeof(serv_addr));
1745  serv_addr.sin_family = AF_INET;
1746  serv_addr.sin_port = pg_hton16(0);
1747  serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1748  if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1749  {
1750  pg_log_error("pgpipe: could not bind: error code %d",
1751  WSAGetLastError());
1752  closesocket(s);
1753  return -1;
1754  }
1755  if (listen(s, 1) == SOCKET_ERROR)
1756  {
1757  pg_log_error("pgpipe: could not listen: error code %d",
1758  WSAGetLastError());
1759  closesocket(s);
1760  return -1;
1761  }
1762  if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1763  {
1764  pg_log_error("pgpipe: getsockname() failed: error code %d",
1765  WSAGetLastError());
1766  closesocket(s);
1767  return -1;
1768  }
1769 
1770  /*
1771  * setup pipe handles
1772  */
1773  if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1774  {
1775  pg_log_error("pgpipe: could not create second socket: error code %d",
1776  WSAGetLastError());
1777  closesocket(s);
1778  return -1;
1779  }
1780  handles[1] = (int) tmp_sock;
1781 
1782  if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1783  {
1784  pg_log_error("pgpipe: could not connect socket: error code %d",
1785  WSAGetLastError());
1786  closesocket(handles[1]);
1787  handles[1] = -1;
1788  closesocket(s);
1789  return -1;
1790  }
1791  if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1792  {
1793  pg_log_error("pgpipe: could not accept connection: error code %d",
1794  WSAGetLastError());
1795  closesocket(handles[1]);
1796  handles[1] = -1;
1797  closesocket(s);
1798  return -1;
1799  }
1800  handles[0] = (int) tmp_sock;
1801 
1802  closesocket(s);
1803  return 0;
1804 }
1805 
1806 #endif /* WIN32 */
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static PgChecksumMode mode
Definition: pg_checksums.c:61
int pipeRead
Definition: parallel.c:105
int DumpId
Definition: pg_backup.h:244
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
Definition: parallel.c:1113
#define SIGQUIT
Definition: win32_port.h:160
PQExpBufferData * PQExpBuffer
Definition: pqexpbuffer.h:51
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1210
#define accept(s, addr, addrlen)
Definition: win32_port.h:462
static void set_cancel_pstate(ParallelState *pstate)
Definition: parallel.c:794
struct DumpSignalInformation DumpSignalInformation
ArchiveHandle * AH
Definition: parallel.c:103
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1273
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void * callback_data
Definition: parallel.c:101
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel.c:1545
#define pg_log_error(...)
Definition: logging.h:80
struct WorkerInfoData * WorkerInfo
Definition: autovacuum.c:234
#define closesocket
Definition: port.h:331
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
Definition: parallel.c:1161
void PQfreeCancel(PGcancel *cancel)
Definition: fe-connect.c:4318
#define pg_hton16(x)
Definition: pg_bswap.h:120
static int GetIdleWorker(ParallelState *pstate)
Definition: parallel.c:1241
struct ShutdownInformation ShutdownInformation
ArchiveHandle * myAH
Definition: parallel.c:167
#define write_stderr(str)
Definition: parallel.c:186
#define PIPE_READ
Definition: parallel.c:71
static bool HasEveryWorkerTerminated(ParallelState *pstate)
Definition: parallel.c:1257
#define kill(pid, sig)
Definition: win32_port.h:454
static void setup_cancel_handler(void)
Definition: parallel.c:613
#define connect(s, name, namelen)
Definition: win32_port.h:463
#define SIGPIPE
Definition: win32_port.h:164
int n_errors
Definition: pg_backup.h:216
PGcancel *volatile connCancel
static void ShutdownWorkersHard(ParallelState *pstate)
Definition: parallel.c:400
void on_exit_close_archive(Archive *AHX)
Definition: parallel.c:333
const char * progname
Definition: pg_standby.c:36
#define bind(s, addr, addrlen)
Definition: win32_port.h:460
SetupWorkerPtrType SetupWorkerPtr
static int fd(const char *x, int i)
Definition: preproc-init.c:105
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:2692
#define WORKER_IGNORED_ERRORS
void DeCloneArchive(ArchiveHandle *AH)
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:116
ParallelState * pstate
Definition: parallel.c:168
#define pg_hton32(x)
Definition: pg_bswap.h:121
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
Definition: parallel.c:269
PGconn * conn
Definition: streamutil.c:54
pid_t pid
Definition: parallel.c:115
#define NO_SLOT
Definition: parallel.c:74
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:267
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
Definition: parallel.c:834
WFW_WaitOption
Definition: parallel.h:28
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
Definition: parallel.h:22
static char * buf
Definition: pg_test_fsync.c:68
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
int pipeRevRead
Definition: parallel.c:107
static char * readMessageFromPipe(int fd)
Definition: parallel.c:1667
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-connect.c:4295
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
#define pgpipe(a)
Definition: parallel.c:139
#define select(n, r, w, e, timeout)
Definition: win32_port.h:464
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
Definition: parallel.c:1584
static void sendMessageToLeader(int pipefd[2], const char *str)
Definition: parallel.c:1532
int pgsocket
Definition: port.h:31
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:232
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:902
#define SIG_IGN
Definition: win32_port.h:156
ParallelSlot * parallelSlot
Definition: parallel.h:45
#define listen(s, backlog)
Definition: win32_port.h:461
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1064
#define exit_nicely(code)
Definition: pg_dumpall.c:96
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
Definition: parallel.c:1649
ParallelState * pstate
Definition: parallel.c:150
#define socket(af, type, protocol)
Definition: win32_port.h:459
#define PGINVALID_SOCKET
Definition: port.h:33
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:74
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
Definition: parallel.c:1403
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
Definition: parallel.c:1306
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1456
void PQclear(PGresult *res)
Definition: fe-exec.c:694
void init_parallel_dump_utils(void)
Definition: parallel.c:238
pqsigfunc pqsignal(int signum, pqsigfunc handler)
Definition: signal.c:170
#define free(a)
Definition: header.h:65
T_WorkerStatus
Definition: parallel.c:77
static ShutdownInformation shutdown_info
Definition: parallel.c:154
#define SIGNAL_ARGS
Definition: c.h:1275
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
Definition: parallel.c:1128
#define Assert(condition)
Definition: c.h:746
TocEntry ** te
Definition: parallel.h:44
ParallelCompletionPtr callback
Definition: parallel.c:100
int pipeRevWrite
Definition: parallel.c:108
PQExpBuffer(* getLocalPQExpBuffer)(void)
Definition: string_utils.c:27
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
#define fatal(...)
int numWorkers
Definition: pg_backup.h:203
void pg_free(void *ptr)
Definition: fe_memutils.c:105
static volatile DumpSignalInformation signal_info
Definition: parallel.c:175
T_WorkerStatus workerStatus
Definition: parallel.c:97
#define piperead(a, b, c)
Definition: parallel.c:140
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:85
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-connect.c:4450
static bool do_wait
Definition: pg_ctl.c:79
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
Definition: parallel.c:735
static char * getMessageFromLeader(int pipefd[2])
Definition: parallel.c:1521
int pipeWrite
Definition: parallel.c:106
int i
WorkerJobRestorePtrType WorkerJobRestorePtr
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
Definition: parallel.c:814
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:1939
void * arg
#define pipewrite(a, b, c)
Definition: parallel.c:141
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition: parallel.c:451
static void sigTermHandler(SIGNAL_ARGS)
Definition: parallel.c:550
#define EINTR
Definition: win32_port.h:343
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
Definition: parallel.c:1176
static void archive_close_connection(int code, void *arg)
Definition: parallel.c:344
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:148
#define PIPE_WRITE
Definition: parallel.c:72
static void static void status(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:227
#define snprintf
Definition: port.h:215
int numWorkers
Definition: parallel.h:42
#define messageStartsWith(msg, prefix)
Definition: parallel.c:228
WorkerJobDumpPtrType WorkerJobDumpPtr
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
Definition: parallel.c:1341