PostgreSQL Source Code  git master
parallel.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *
5  * Parallel support for pg_dump and pg_restore
6  *
7  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  * src/bin/pg_dump/parallel.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 /*
17  * Parallel operation works like this:
18  *
19  * The original, leader process calls ParallelBackupStart(), which forks off
20  * the desired number of worker processes, which each enter WaitForCommands().
21  *
22  * The leader process dispatches an individual work item to one of the worker
23  * processes in DispatchJobForTocEntry(). We send a command string such as
24  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25  * The worker process receives and decodes the command and passes it to the
26  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27  * which are routines of the current archive format. That routine performs
28  * the required action (dump or restore) and returns an integer status code.
29  * This is passed back to the leader where we pass it to the
30  * ParallelCompletionPtr callback function that was passed to
31  * DispatchJobForTocEntry(). The callback function does state updating
32  * for the leader control logic in pg_backup_archiver.c.
33  *
34  * In principle additional archive-format-specific information might be needed
35  * in commands or worker status responses, but so far that hasn't proved
36  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37  * data structures. Remember that we have forked off the workers only after
38  * we have read in the catalog. That's why our worker processes can also
39  * access the catalog information. (In the Windows case, the workers are
40  * threads in the same process. To avoid problems, they work with cloned
41  * copies of the Archive data structure; see RunWorker().)
42  *
43  * In the leader process, the workerStatus field for each worker has one of
44  * the following values:
45  * WRKR_NOT_STARTED: we've not yet forked this worker
46  * WRKR_IDLE: it's waiting for a command
47  * WRKR_WORKING: it's working on a command
48  * WRKR_TERMINATED: process ended
49  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50  * state, and must be NULL in other states.
51  */
52 
53 #include "postgres_fe.h"
54 
55 #ifndef WIN32
56 #include <sys/select.h>
57 #include <sys/wait.h>
58 #include <signal.h>
59 #include <unistd.h>
60 #include <fcntl.h>
61 #endif
62 
63 #include "fe_utils/string_utils.h"
64 #include "parallel.h"
65 #include "pg_backup_utils.h"
66 #include "port/pg_bswap.h"
67 
68 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
69 #define PIPE_READ 0
70 #define PIPE_WRITE 1
71 
72 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
73 
74 /* Worker process statuses */
75 typedef enum
76 {
82 
83 #define WORKER_IS_RUNNING(workerStatus) \
84  ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
85 
86 /*
87  * Private per-parallel-worker state (typedef for this is in parallel.h).
88  *
89  * Much of this is valid only in the leader process (or, on Windows, should
90  * be touched only by the leader thread). But the AH field should be touched
91  * only by workers. The pipe descriptors are valid everywhere.
92  */
94 {
95  T_WorkerStatus workerStatus; /* see enum above */
96 
97  /* These fields are valid if workerStatus == WRKR_WORKING: */
98  ParallelCompletionPtr callback; /* function to call on completion */
99  void *callback_data; /* passthrough data for it */
100 
101  ArchiveHandle *AH; /* Archive data worker is using */
102 
103  int pipeRead; /* leader's end of the pipes */
105  int pipeRevRead; /* child's end of the pipes */
107 
108  /* Child process/thread identity info: */
109 #ifdef WIN32
110  uintptr_t hThread;
111  unsigned int threadId;
112 #else
113  pid_t pid;
114 #endif
115 };
116 
117 #ifdef WIN32
118 
119 /*
120  * Structure to hold info passed by _beginthreadex() to the function it calls
121  * via its single allowed argument.
122  */
123 typedef struct
124 {
125  ArchiveHandle *AH; /* leader database connection */
126  ParallelSlot *slot; /* this worker's parallel slot */
127 } WorkerInfo;
128 
129 /* Windows implementation of pipe access */
130 static int pgpipe(int handles[2]);
131 #define piperead(a,b,c) recv(a,b,c,0)
132 #define pipewrite(a,b,c) send(a,b,c,0)
133 
134 #else /* !WIN32 */
135 
136 /* Non-Windows implementation of pipe access */
137 #define pgpipe(a) pipe(a)
138 #define piperead(a,b,c) read(a,b,c)
139 #define pipewrite(a,b,c) write(a,b,c)
140 
141 #endif /* WIN32 */
142 
143 /*
144  * State info for archive_close_connection() shutdown callback.
145  */
146 typedef struct ShutdownInformation
147 {
151 
153 
154 /*
155  * State info for signal handling.
156  * We assume signal_info initializes to zeroes.
157  *
158  * On Unix, myAH is the leader DB connection in the leader process, and the
159  * worker's own connection in worker processes. On Windows, we have only one
160  * instance of signal_info, so myAH is the leader connection and the worker
161  * connections must be dug out of pstate->parallelSlot[].
162  */
163 typedef struct DumpSignalInformation
164 {
165  ArchiveHandle *myAH; /* database connection to issue cancel for */
166  ParallelState *pstate; /* parallel state, if any */
167  bool handler_set; /* signal handler set up in this process? */
168 #ifndef WIN32
169  bool am_worker; /* am I a worker process? */
170 #endif
172 
174 
175 #ifdef WIN32
176 static CRITICAL_SECTION signal_info_lock;
177 #endif
178 
179 /*
180  * Write a simple string to stderr --- must be safe in a signal handler.
181  * We ignore the write() result since there's not much we could do about it.
182  * Certain compilers make that harder than it ought to be.
183  */
184 #define write_stderr(str) \
185  do { \
186  const char *str_ = (str); \
187  int rc_; \
188  rc_ = write(fileno(stderr), str_, strlen(str_)); \
189  (void) rc_; \
190  } while (0)
191 
192 
193 #ifdef WIN32
194 /* file-scope variables */
195 static DWORD tls_index;
196 
197 /* globally visible variables (needed by exit_nicely) */
198 bool parallel_init_done = false;
199 DWORD mainThreadId;
200 #endif /* WIN32 */
201 
202 /* Local function prototypes */
203 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
204 static void archive_close_connection(int code, void *arg);
205 static void ShutdownWorkersHard(ParallelState *pstate);
206 static void WaitForTerminatingWorkers(ParallelState *pstate);
207 static void set_cancel_handler(void);
208 static void set_cancel_pstate(ParallelState *pstate);
209 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
210 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
211 static int GetIdleWorker(ParallelState *pstate);
212 static bool HasEveryWorkerTerminated(ParallelState *pstate);
213 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
214 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
215 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
216  bool do_wait);
217 static char *getMessageFromLeader(int pipefd[2]);
218 static void sendMessageToLeader(int pipefd[2], const char *str);
219 static int select_loop(int maxFd, fd_set *workerset);
220 static char *getMessageFromWorker(ParallelState *pstate,
221  bool do_wait, int *worker);
222 static void sendMessageToWorker(ParallelState *pstate,
223  int worker, const char *str);
224 static char *readMessageFromPipe(int fd);
225 
226 #define messageStartsWith(msg, prefix) \
227  (strncmp(msg, prefix, strlen(prefix)) == 0)
228 
229 
230 /*
231  * Initialize parallel dump support --- should be called early in process
232  * startup. (Currently, this is called whether or not we intend parallel
233  * activity.)
234  */
235 void
237 {
238 #ifdef WIN32
239  if (!parallel_init_done)
240  {
241  WSADATA wsaData;
242  int err;
243 
244  /* Prepare for threaded operation */
245  tls_index = TlsAlloc();
246  mainThreadId = GetCurrentThreadId();
247 
248  /* Initialize socket access */
249  err = WSAStartup(MAKEWORD(2, 2), &wsaData);
250  if (err != 0)
251  pg_fatal("%s() failed: error code %d", "WSAStartup", err);
252 
253  parallel_init_done = true;
254  }
255 #endif
256 }
257 
258 /*
259  * Find the ParallelSlot for the current worker process or thread.
260  *
261  * Returns NULL if no matching slot is found (this implies we're the leader).
262  */
263 static ParallelSlot *
265 {
266  int i;
267 
268  for (i = 0; i < pstate->numWorkers; i++)
269  {
270 #ifdef WIN32
271  if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
272 #else
273  if (pstate->parallelSlot[i].pid == getpid())
274 #endif
275  return &(pstate->parallelSlot[i]);
276  }
277 
278  return NULL;
279 }
280 
281 /*
282  * A thread-local version of getLocalPQExpBuffer().
283  *
284  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
285  * thread, which is much better than one per fmtId/fmtQualifiedId call.
286  */
287 #ifdef WIN32
288 static PQExpBuffer
289 getThreadLocalPQExpBuffer(void)
290 {
291  /*
292  * The Tls code goes awry if we use a static var, so we provide for both
293  * static and auto, and omit any use of the static var when using Tls. We
294  * rely on TlsGetValue() to return 0 if the value is not yet set.
295  */
296  static PQExpBuffer s_id_return = NULL;
297  PQExpBuffer id_return;
298 
299  if (parallel_init_done)
300  id_return = (PQExpBuffer) TlsGetValue(tls_index);
301  else
302  id_return = s_id_return;
303 
304  if (id_return) /* first time through? */
305  {
306  /* same buffer, just wipe contents */
307  resetPQExpBuffer(id_return);
308  }
309  else
310  {
311  /* new buffer */
312  id_return = createPQExpBuffer();
313  if (parallel_init_done)
314  TlsSetValue(tls_index, id_return);
315  else
316  s_id_return = id_return;
317  }
318 
319  return id_return;
320 }
321 #endif /* WIN32 */
322 
323 /*
324  * pg_dump and pg_restore call this to register the cleanup handler
325  * as soon as they've created the ArchiveHandle.
326  */
327 void
329 {
330  shutdown_info.AHX = AHX;
332 }
333 
334 /*
335  * on_exit_nicely handler for shutting down database connections and
336  * worker processes cleanly.
337  */
338 static void
340 {
342 
343  if (si->pstate)
344  {
345  /* In parallel mode, must figure out who we are */
346  ParallelSlot *slot = GetMyPSlot(si->pstate);
347 
348  if (!slot)
349  {
350  /*
351  * We're the leader. Forcibly shut down workers, then close our
352  * own database connection, if any.
353  */
355 
356  if (si->AHX)
357  DisconnectDatabase(si->AHX);
358  }
359  else
360  {
361  /*
362  * We're a worker. Shut down our own DB connection if any. On
363  * Windows, we also have to close our communication sockets, to
364  * emulate what will happen on Unix when the worker process exits.
365  * (Without this, if this is a premature exit, the leader would
366  * fail to detect it because there would be no EOF condition on
367  * the other end of the pipe.)
368  */
369  if (slot->AH)
370  DisconnectDatabase(&(slot->AH->public));
371 
372 #ifdef WIN32
373  closesocket(slot->pipeRevRead);
374  closesocket(slot->pipeRevWrite);
375 #endif
376  }
377  }
378  else
379  {
380  /* Non-parallel operation: just kill the leader DB connection */
381  if (si->AHX)
382  DisconnectDatabase(si->AHX);
383  }
384 }
385 
386 /*
387  * Forcibly shut down any remaining workers, waiting for them to finish.
388  *
389  * Note that we don't expect to come here during normal exit (the workers
390  * should be long gone, and the ParallelState too). We're only here in a
391  * pg_fatal() situation, so intervening to cancel active commands is
392  * appropriate.
393  */
394 static void
396 {
397  int i;
398 
399  /*
400  * Close our write end of the sockets so that any workers waiting for
401  * commands know they can exit. (Note: some of the pipeWrite fields might
402  * still be zero, if we failed to initialize all the workers. Hence, just
403  * ignore errors here.)
404  */
405  for (i = 0; i < pstate->numWorkers; i++)
407 
408  /*
409  * Force early termination of any commands currently in progress.
410  */
411 #ifndef WIN32
412  /* On non-Windows, send SIGTERM to each worker process. */
413  for (i = 0; i < pstate->numWorkers; i++)
414  {
415  pid_t pid = pstate->parallelSlot[i].pid;
416 
417  if (pid != 0)
418  kill(pid, SIGTERM);
419  }
420 #else
421 
422  /*
423  * On Windows, send query cancels directly to the workers' backends. Use
424  * a critical section to ensure worker threads don't change state.
425  */
426  EnterCriticalSection(&signal_info_lock);
427  for (i = 0; i < pstate->numWorkers; i++)
428  {
429  ArchiveHandle *AH = pstate->parallelSlot[i].AH;
430  char errbuf[1];
431 
432  if (AH != NULL && AH->connCancel != NULL)
433  (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
434  }
435  LeaveCriticalSection(&signal_info_lock);
436 #endif
437 
438  /* Now wait for them to terminate. */
440 }
441 
442 /*
443  * Wait for all workers to terminate.
444  */
445 static void
447 {
448  while (!HasEveryWorkerTerminated(pstate))
449  {
450  ParallelSlot *slot = NULL;
451  int j;
452 
453 #ifndef WIN32
454  /* On non-Windows, use wait() to wait for next worker to end */
455  int status;
456  pid_t pid = wait(&status);
457 
458  /* Find dead worker's slot, and clear the PID field */
459  for (j = 0; j < pstate->numWorkers; j++)
460  {
461  slot = &(pstate->parallelSlot[j]);
462  if (slot->pid == pid)
463  {
464  slot->pid = 0;
465  break;
466  }
467  }
468 #else /* WIN32 */
469  /* On Windows, we must use WaitForMultipleObjects() */
470  HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
471  int nrun = 0;
472  DWORD ret;
473  uintptr_t hThread;
474 
475  for (j = 0; j < pstate->numWorkers; j++)
476  {
478  {
479  lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
480  nrun++;
481  }
482  }
483  ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
484  Assert(ret != WAIT_FAILED);
485  hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
486  free(lpHandles);
487 
488  /* Find dead worker's slot, and clear the hThread field */
489  for (j = 0; j < pstate->numWorkers; j++)
490  {
491  slot = &(pstate->parallelSlot[j]);
492  if (slot->hThread == hThread)
493  {
494  /* For cleanliness, close handles for dead threads */
495  CloseHandle((HANDLE) slot->hThread);
496  slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
497  break;
498  }
499  }
500 #endif /* WIN32 */
501 
502  /* On all platforms, update workerStatus and te[] as well */
503  Assert(j < pstate->numWorkers);
505  pstate->te[j] = NULL;
506  }
507 }
508 
509 
510 /*
511  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
512  *
513  * This doesn't quite belong in this module, but it needs access to the
514  * ParallelState data, so there's not really a better place either.
515  *
516  * When we get a cancel interrupt, we could just die, but in pg_restore that
517  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
518  * for a long time. Instead, we try to send a cancel request and then die.
519  * pg_dump probably doesn't really need this, but we might as well use it
520  * there too. Note that sending the cancel directly from the signal handler
521  * is safe because PQcancel() is written to make it so.
522  *
523  * In parallel operation on Unix, each process is responsible for canceling
524  * its own connection (this must be so because nobody else has access to it).
525  * Furthermore, the leader process should attempt to forward its signal to
526  * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
527  * needed because typing control-C at the console would deliver SIGINT to
528  * every member of the terminal process group --- but in other scenarios it
529  * might be that only the leader gets signaled.
530  *
531  * On Windows, the cancel handler runs in a separate thread, because that's
532  * how SetConsoleCtrlHandler works. We make it stop worker threads, send
533  * cancels on all active connections, and then return FALSE, which will allow
534  * the process to die. For safety's sake, we use a critical section to
535  * protect the PGcancel structures against being changed while the signal
536  * thread runs.
537  */
538 
539 #ifndef WIN32
540 
541 /*
542  * Signal handler (Unix only)
543  */
544 static void
546 {
547  int i;
548  char errbuf[1];
549 
550  /*
551  * Some platforms allow delivery of new signals to interrupt an active
552  * signal handler. That could muck up our attempt to send PQcancel, so
553  * disable the signals that set_cancel_handler enabled.
554  */
555  pqsignal(SIGINT, SIG_IGN);
556  pqsignal(SIGTERM, SIG_IGN);
558 
559  /*
560  * If we're in the leader, forward signal to all workers. (It seems best
561  * to do this before PQcancel; killing the leader transaction will result
562  * in invalid-snapshot errors from active workers, which maybe we can
563  * quiet by killing workers first.) Ignore any errors.
564  */
565  if (signal_info.pstate != NULL)
566  {
567  for (i = 0; i < signal_info.pstate->numWorkers; i++)
568  {
569  pid_t pid = signal_info.pstate->parallelSlot[i].pid;
570 
571  if (pid != 0)
572  kill(pid, SIGTERM);
573  }
574  }
575 
576  /*
577  * Send QueryCancel if we have a connection to send to. Ignore errors,
578  * there's not much we can do about them anyway.
579  */
580  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
581  (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
582 
583  /*
584  * Report we're quitting, using nothing more complicated than write(2).
585  * When in parallel operation, only the leader process should do this.
586  */
587  if (!signal_info.am_worker)
588  {
589  if (progname)
590  {
592  write_stderr(": ");
593  }
594  write_stderr("terminated by user\n");
595  }
596 
597  /*
598  * And die, using _exit() not exit() because the latter will invoke atexit
599  * handlers that can fail if we interrupted related code.
600  */
601  _exit(1);
602 }
603 
604 /*
605  * Enable cancel interrupt handler, if not already done.
606  */
607 static void
609 {
610  /*
611  * When forking, signal_info.handler_set will propagate into the new
612  * process, but that's fine because the signal handler state does too.
613  */
615  {
616  signal_info.handler_set = true;
617 
618  pqsignal(SIGINT, sigTermHandler);
619  pqsignal(SIGTERM, sigTermHandler);
621  }
622 }
623 
624 #else /* WIN32 */
625 
626 /*
627  * Console interrupt handler --- runs in a newly-started thread.
628  *
629  * After stopping other threads and sending cancel requests on all open
630  * connections, we return FALSE which will allow the default ExitProcess()
631  * action to be taken.
632  */
633 static BOOL WINAPI
634 consoleHandler(DWORD dwCtrlType)
635 {
636  int i;
637  char errbuf[1];
638 
639  if (dwCtrlType == CTRL_C_EVENT ||
640  dwCtrlType == CTRL_BREAK_EVENT)
641  {
642  /* Critical section prevents changing data we look at here */
643  EnterCriticalSection(&signal_info_lock);
644 
645  /*
646  * If in parallel mode, stop worker threads and send QueryCancel to
647  * their connected backends. The main point of stopping the worker
648  * threads is to keep them from reporting the query cancels as errors,
649  * which would clutter the user's screen. We needn't stop the leader
650  * thread since it won't be doing much anyway. Do this before
651  * canceling the main transaction, else we might get invalid-snapshot
652  * errors reported before we can stop the workers. Ignore errors,
653  * there's not much we can do about them anyway.
654  */
655  if (signal_info.pstate != NULL)
656  {
657  for (i = 0; i < signal_info.pstate->numWorkers; i++)
658  {
660  ArchiveHandle *AH = slot->AH;
661  HANDLE hThread = (HANDLE) slot->hThread;
662 
663  /*
664  * Using TerminateThread here may leave some resources leaked,
665  * but it doesn't matter since we're about to end the whole
666  * process.
667  */
668  if (hThread != INVALID_HANDLE_VALUE)
669  TerminateThread(hThread, 0);
670 
671  if (AH != NULL && AH->connCancel != NULL)
672  (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
673  }
674  }
675 
676  /*
677  * Send QueryCancel to leader connection, if enabled. Ignore errors,
678  * there's not much we can do about them anyway.
679  */
680  if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
682  errbuf, sizeof(errbuf));
683 
684  LeaveCriticalSection(&signal_info_lock);
685 
686  /*
687  * Report we're quitting, using nothing more complicated than
688  * write(2). (We might be able to get away with using pg_log_*()
689  * here, but since we terminated other threads uncleanly above, it
690  * seems better to assume as little as possible.)
691  */
692  if (progname)
693  {
695  write_stderr(": ");
696  }
697  write_stderr("terminated by user\n");
698  }
699 
700  /* Always return FALSE to allow signal handling to continue */
701  return FALSE;
702 }
703 
704 /*
705  * Enable cancel interrupt handler, if not already done.
706  */
707 static void
708 set_cancel_handler(void)
709 {
711  {
712  signal_info.handler_set = true;
713 
714  InitializeCriticalSection(&signal_info_lock);
715 
716  SetConsoleCtrlHandler(consoleHandler, TRUE);
717  }
718 }
719 
720 #endif /* WIN32 */
721 
722 
723 /*
724  * set_archive_cancel_info
725  *
726  * Fill AH->connCancel with cancellation info for the specified database
727  * connection; or clear it if conn is NULL.
728  */
729 void
731 {
732  PGcancel *oldConnCancel;
733 
734  /*
735  * Activate the interrupt handler if we didn't yet in this process. On
736  * Windows, this also initializes signal_info_lock; therefore it's
737  * important that this happen at least once before we fork off any
738  * threads.
739  */
741 
742  /*
743  * On Unix, we assume that storing a pointer value is atomic with respect
744  * to any possible signal interrupt. On Windows, use a critical section.
745  */
746 
747 #ifdef WIN32
748  EnterCriticalSection(&signal_info_lock);
749 #endif
750 
751  /* Free the old one if we have one */
752  oldConnCancel = AH->connCancel;
753  /* be sure interrupt handler doesn't use pointer while freeing */
754  AH->connCancel = NULL;
755 
756  if (oldConnCancel != NULL)
757  PQfreeCancel(oldConnCancel);
758 
759  /* Set the new one if specified */
760  if (conn)
761  AH->connCancel = PQgetCancel(conn);
762 
763  /*
764  * On Unix, there's only ever one active ArchiveHandle per process, so we
765  * can just set signal_info.myAH unconditionally. On Windows, do that
766  * only in the main thread; worker threads have to make sure their
767  * ArchiveHandle appears in the pstate data, which is dealt with in
768  * RunWorker().
769  */
770 #ifndef WIN32
771  signal_info.myAH = AH;
772 #else
773  if (mainThreadId == GetCurrentThreadId())
774  signal_info.myAH = AH;
775 #endif
776 
777 #ifdef WIN32
778  LeaveCriticalSection(&signal_info_lock);
779 #endif
780 }
781 
782 /*
783  * set_cancel_pstate
784  *
785  * Set signal_info.pstate to point to the specified ParallelState, if any.
786  * We need this mainly to have an interlock against Windows signal thread.
787  */
788 static void
790 {
791 #ifdef WIN32
792  EnterCriticalSection(&signal_info_lock);
793 #endif
794 
795  signal_info.pstate = pstate;
796 
797 #ifdef WIN32
798  LeaveCriticalSection(&signal_info_lock);
799 #endif
800 }
801 
802 /*
803  * set_cancel_slot_archive
804  *
805  * Set ParallelSlot's AH field to point to the specified archive, if any.
806  * We need this mainly to have an interlock against Windows signal thread.
807  */
808 static void
810 {
811 #ifdef WIN32
812  EnterCriticalSection(&signal_info_lock);
813 #endif
814 
815  slot->AH = AH;
816 
817 #ifdef WIN32
818  LeaveCriticalSection(&signal_info_lock);
819 #endif
820 }
821 
822 
823 /*
824  * This function is called by both Unix and Windows variants to set up
825  * and run a worker process. Caller should exit the process (or thread)
826  * upon return.
827  */
828 static void
830 {
831  int pipefd[2];
832 
833  /* fetch child ends of pipes */
834  pipefd[PIPE_READ] = slot->pipeRevRead;
835  pipefd[PIPE_WRITE] = slot->pipeRevWrite;
836 
837  /*
838  * Clone the archive so that we have our own state to work with, and in
839  * particular our own database connection.
840  *
841  * We clone on Unix as well as Windows, even though technically we don't
842  * need to because fork() gives us a copy in our own address space
843  * already. But CloneArchive resets the state information and also clones
844  * the database connection which both seem kinda helpful.
845  */
846  AH = CloneArchive(AH);
847 
848  /* Remember cloned archive where signal handler can find it */
849  set_cancel_slot_archive(slot, AH);
850 
851  /*
852  * Call the setup worker function that's defined in the ArchiveHandle.
853  */
854  (AH->SetupWorkerPtr) ((Archive *) AH);
855 
856  /*
857  * Execute commands until done.
858  */
859  WaitForCommands(AH, pipefd);
860 
861  /*
862  * Disconnect from database and clean up.
863  */
864  set_cancel_slot_archive(slot, NULL);
865  DisconnectDatabase(&(AH->public));
866  DeCloneArchive(AH);
867 }
868 
869 /*
870  * Thread base function for Windows
871  */
872 #ifdef WIN32
873 static unsigned __stdcall
874 init_spawned_worker_win32(WorkerInfo *wi)
875 {
876  ArchiveHandle *AH = wi->AH;
877  ParallelSlot *slot = wi->slot;
878 
879  /* Don't need WorkerInfo anymore */
880  free(wi);
881 
882  /* Run the worker ... */
883  RunWorker(AH, slot);
884 
885  /* Exit the thread */
886  _endthreadex(0);
887  return 0;
888 }
889 #endif /* WIN32 */
890 
891 /*
892  * This function starts a parallel dump or restore by spawning off the worker
893  * processes. For Windows, it creates a number of threads; on Unix the
894  * workers are created with fork().
895  */
898 {
899  ParallelState *pstate;
900  int i;
901 
902  Assert(AH->public.numWorkers > 0);
903 
904  pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
905 
906  pstate->numWorkers = AH->public.numWorkers;
907  pstate->te = NULL;
908  pstate->parallelSlot = NULL;
909 
910  if (AH->public.numWorkers == 1)
911  return pstate;
912 
913  /* Create status arrays, being sure to initialize all fields to 0 */
914  pstate->te = (TocEntry **)
915  pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
916  pstate->parallelSlot = (ParallelSlot *)
917  pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
918 
919 #ifdef WIN32
920  /* Make fmtId() and fmtQualifiedId() use thread-local storage */
921  getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
922 #endif
923 
924  /*
925  * Set the pstate in shutdown_info, to tell the exit handler that it must
926  * clean up workers as well as the main database connection. But we don't
927  * set this in signal_info yet, because we don't want child processes to
928  * inherit non-NULL signal_info.pstate.
929  */
930  shutdown_info.pstate = pstate;
931 
932  /*
933  * Temporarily disable query cancellation on the leader connection. This
934  * ensures that child processes won't inherit valid AH->connCancel
935  * settings and thus won't try to issue cancels against the leader's
936  * connection. No harm is done if we fail while it's disabled, because
937  * the leader connection is idle at this point anyway.
938  */
939  set_archive_cancel_info(AH, NULL);
940 
941  /* Ensure stdio state is quiesced before forking */
942  fflush(NULL);
943 
944  /* Create desired number of workers */
945  for (i = 0; i < pstate->numWorkers; i++)
946  {
947 #ifdef WIN32
948  WorkerInfo *wi;
949  uintptr_t handle;
950 #else
951  pid_t pid;
952 #endif
953  ParallelSlot *slot = &(pstate->parallelSlot[i]);
954  int pipeMW[2],
955  pipeWM[2];
956 
957  /* Create communication pipes for this worker */
958  if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
959  pg_fatal("could not create communication channels: %m");
960 
961  /* leader's ends of the pipes */
962  slot->pipeRead = pipeWM[PIPE_READ];
963  slot->pipeWrite = pipeMW[PIPE_WRITE];
964  /* child's ends of the pipes */
965  slot->pipeRevRead = pipeMW[PIPE_READ];
966  slot->pipeRevWrite = pipeWM[PIPE_WRITE];
967 
968 #ifdef WIN32
969  /* Create transient structure to pass args to worker function */
970  wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
971 
972  wi->AH = AH;
973  wi->slot = slot;
974 
975  handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
976  wi, 0, &(slot->threadId));
977  slot->hThread = handle;
978  slot->workerStatus = WRKR_IDLE;
979 #else /* !WIN32 */
980  pid = fork();
981  if (pid == 0)
982  {
983  /* we are the worker */
984  int j;
985 
986  /* this is needed for GetMyPSlot() */
987  slot->pid = getpid();
988 
989  /* instruct signal handler that we're in a worker now */
990  signal_info.am_worker = true;
991 
992  /* close read end of Worker -> Leader */
993  closesocket(pipeWM[PIPE_READ]);
994  /* close write end of Leader -> Worker */
995  closesocket(pipeMW[PIPE_WRITE]);
996 
997  /*
998  * Close all inherited fds for communication of the leader with
999  * previously-forked workers.
1000  */
1001  for (j = 0; j < i; j++)
1002  {
1003  closesocket(pstate->parallelSlot[j].pipeRead);
1004  closesocket(pstate->parallelSlot[j].pipeWrite);
1005  }
1006 
1007  /* Run the worker ... */
1008  RunWorker(AH, slot);
1009 
1010  /* We can just exit(0) when done */
1011  exit(0);
1012  }
1013  else if (pid < 0)
1014  {
1015  /* fork failed */
1016  pg_fatal("could not create worker process: %m");
1017  }
1018 
1019  /* In Leader after successful fork */
1020  slot->pid = pid;
1021  slot->workerStatus = WRKR_IDLE;
1022 
1023  /* close read end of Leader -> Worker */
1024  closesocket(pipeMW[PIPE_READ]);
1025  /* close write end of Worker -> Leader */
1026  closesocket(pipeWM[PIPE_WRITE]);
1027 #endif /* WIN32 */
1028  }
1029 
1030  /*
1031  * Having forked off the workers, disable SIGPIPE so that leader isn't
1032  * killed if it tries to send a command to a dead worker. We don't want
1033  * the workers to inherit this setting, though.
1034  */
1035 #ifndef WIN32
1037 #endif
1038 
1039  /*
1040  * Re-establish query cancellation on the leader connection.
1041  */
1043 
1044  /*
1045  * Tell the cancel signal handler to forward signals to worker processes,
1046  * too. (As with query cancel, we did not need this earlier because the
1047  * workers have not yet been given anything to do; if we die before this
1048  * point, any already-started workers will see EOF and quit promptly.)
1049  */
1050  set_cancel_pstate(pstate);
1051 
1052  return pstate;
1053 }
1054 
1055 /*
1056  * Close down a parallel dump or restore.
1057  */
1058 void
1060 {
1061  int i;
1062 
1063  /* No work if non-parallel */
1064  if (pstate->numWorkers == 1)
1065  return;
1066 
1067  /* There should not be any unfinished jobs */
1068  Assert(IsEveryWorkerIdle(pstate));
1069 
1070  /* Close the sockets so that the workers know they can exit */
1071  for (i = 0; i < pstate->numWorkers; i++)
1072  {
1073  closesocket(pstate->parallelSlot[i].pipeRead);
1074  closesocket(pstate->parallelSlot[i].pipeWrite);
1075  }
1076 
1077  /* Wait for them to exit */
1078  WaitForTerminatingWorkers(pstate);
1079 
1080  /*
1081  * Unlink pstate from shutdown_info, so the exit handler will not try to
1082  * use it; and likewise unlink from signal_info.
1083  */
1084  shutdown_info.pstate = NULL;
1085  set_cancel_pstate(NULL);
1086 
1087  /* Release state (mere neatnik-ism, since we're about to terminate) */
1088  free(pstate->te);
1089  free(pstate->parallelSlot);
1090  free(pstate);
1091 }
1092 
1093 /*
1094  * These next four functions handle construction and parsing of the command
1095  * strings and response strings for parallel workers.
1096  *
1097  * Currently, these can be the same regardless of which archive format we are
1098  * processing. In future, we might want to let format modules override these
1099  * functions to add format-specific data to a command or response.
1100  */
1101 
1102 /*
1103  * buildWorkerCommand: format a command string to send to a worker.
1104  *
1105  * The string is built in the caller-supplied buffer of size buflen.
1106  */
1107 static void
1109  char *buf, int buflen)
1110 {
1111  if (act == ACT_DUMP)
1112  snprintf(buf, buflen, "DUMP %d", te->dumpId);
1113  else if (act == ACT_RESTORE)
1114  snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1115  else
1116  Assert(false);
1117 }
1118 
1119 /*
1120  * parseWorkerCommand: interpret a command string in a worker.
1121  */
1122 static void
1124  const char *msg)
1125 {
1126  DumpId dumpId;
1127  int nBytes;
1128 
1129  if (messageStartsWith(msg, "DUMP "))
1130  {
1131  *act = ACT_DUMP;
1132  sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1133  Assert(nBytes == strlen(msg));
1134  *te = getTocEntryByDumpId(AH, dumpId);
1135  Assert(*te != NULL);
1136  }
1137  else if (messageStartsWith(msg, "RESTORE "))
1138  {
1139  *act = ACT_RESTORE;
1140  sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1141  Assert(nBytes == strlen(msg));
1142  *te = getTocEntryByDumpId(AH, dumpId);
1143  Assert(*te != NULL);
1144  }
1145  else
1146  pg_fatal("unrecognized command received from leader: \"%s\"",
1147  msg);
1148 }
1149 
1150 /*
1151  * buildWorkerResponse: format a response string to send to the leader.
1152  *
1153  * The string is built in the caller-supplied buffer of size buflen.
1154  */
1155 static void
1157  char *buf, int buflen)
1158 {
1159  snprintf(buf, buflen, "OK %d %d %d",
1160  te->dumpId,
1161  status,
1162  status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1163 }
1164 
1165 /*
1166  * parseWorkerResponse: parse the status message returned by a worker.
1167  *
1168  * Returns the integer status code, and may update fields of AH and/or te.
1169  */
1170 static int
1172  const char *msg)
1173 {
1174  DumpId dumpId;
1175  int nBytes,
1176  n_errors;
1177  int status = 0;
1178 
1179  if (messageStartsWith(msg, "OK "))
1180  {
1181  sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1182 
1183  Assert(dumpId == te->dumpId);
1184  Assert(nBytes == strlen(msg));
1185 
1186  AH->public.n_errors += n_errors;
1187  }
1188  else
1189  pg_fatal("invalid message received from worker: \"%s\"",
1190  msg);
1191 
1192  return status;
1193 }
1194 
1195 /*
1196  * Dispatch a job to some free worker.
1197  *
1198  * te is the TocEntry to be processed, act is the action to be taken on it.
1199  * callback is the function to call on completion of the job.
1200  *
1201  * If no worker is currently available, this will block, and previously
1202  * registered callback functions may be called.
1203  */
1204 void
1206  ParallelState *pstate,
1207  TocEntry *te,
1208  T_Action act,
1210  void *callback_data)
1211 {
1212  int worker;
1213  char buf[256];
1214 
1215  /* Get a worker, waiting if none are idle */
1216  while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1217  WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1218 
1219  /* Construct and send command string */
1220  buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1221 
1222  sendMessageToWorker(pstate, worker, buf);
1223 
1224  /* Remember worker is busy, and which TocEntry it's working on */
1225  pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1226  pstate->parallelSlot[worker].callback = callback;
1227  pstate->parallelSlot[worker].callback_data = callback_data;
1228  pstate->te[worker] = te;
1229 }
1230 
1231 /*
1232  * Find an idle worker and return its slot number.
1233  * Return NO_SLOT if none are idle.
1234  */
1235 static int
1237 {
1238  int i;
1239 
1240  for (i = 0; i < pstate->numWorkers; i++)
1241  {
1242  if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1243  return i;
1244  }
1245  return NO_SLOT;
1246 }
1247 
1248 /*
1249  * Return true iff no worker is running.
1250  */
1251 static bool
1253 {
1254  int i;
1255 
1256  for (i = 0; i < pstate->numWorkers; i++)
1257  {
1259  return false;
1260  }
1261  return true;
1262 }
1263 
1264 /*
1265  * Return true iff every worker is in the WRKR_IDLE state.
1266  */
1267 bool
1269 {
1270  int i;
1271 
1272  for (i = 0; i < pstate->numWorkers; i++)
1273  {
1274  if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1275  return false;
1276  }
1277  return true;
1278 }
1279 
1280 /*
1281  * Acquire lock on a table to be dumped by a worker process.
1282  *
1283  * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1284  * it's no problem for a worker to get one too, but if anything else besides
1285  * pg_dump is running, there's a possible deadlock:
1286  *
1287  * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1288  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1289  * because the leader holds a conflicting ACCESS SHARE lock).
1290  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1291  * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1292  * 4) Now we have a deadlock, since the leader is effectively waiting for
1293  * the worker. The server cannot detect that, however.
1294  *
1295  * To prevent an infinite wait, prior to touching a table in a worker, request
1296  * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1297  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1298  * so we have a deadlock. We must fail the backup in that case.
1299  */
1300 static void
1302 {
1303  const char *qualId;
1304  PQExpBuffer query;
1305  PGresult *res;
1306 
1307  /* Nothing to do for BLOBS */
1308  if (strcmp(te->desc, "BLOBS") == 0)
1309  return;
1310 
1311  query = createPQExpBuffer();
1312 
1313  qualId = fmtQualifiedId(te->namespace, te->tag);
1314 
1315  appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1316  qualId);
1317 
1318  res = PQexec(AH->connection, query->data);
1319 
1320  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1321  pg_fatal("could not obtain lock on relation \"%s\"\n"
1322  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1323  "on the table after the pg_dump parent process had gotten the "
1324  "initial ACCESS SHARE lock on the table.", qualId);
1325 
1326  PQclear(res);
1327  destroyPQExpBuffer(query);
1328 }
1329 
1330 /*
1331  * WaitForCommands: main routine for a worker process.
1332  *
1333  * Read and execute commands from the leader until we see EOF on the pipe.
1334  */
1335 static void
1336 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1337 {
1338  char *command;
1339  TocEntry *te;
1340  T_Action act;
1341  int status = 0;
1342  char buf[256];
1343 
1344  for (;;)
1345  {
1346  if (!(command = getMessageFromLeader(pipefd)))
1347  {
1348  /* EOF, so done */
1349  return;
1350  }
1351 
1352  /* Decode the command */
1353  parseWorkerCommand(AH, &te, &act, command);
1354 
1355  if (act == ACT_DUMP)
1356  {
1357  /* Acquire lock on this table within the worker's session */
1358  lockTableForWorker(AH, te);
1359 
1360  /* Perform the dump command */
1361  status = (AH->WorkerJobDumpPtr) (AH, te);
1362  }
1363  else if (act == ACT_RESTORE)
1364  {
1365  /* Perform the restore command */
1366  status = (AH->WorkerJobRestorePtr) (AH, te);
1367  }
1368  else
1369  Assert(false);
1370 
1371  /* Return status to leader */
1372  buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1373 
1374  sendMessageToLeader(pipefd, buf);
1375 
1376  /* command was pg_malloc'd and we are responsible for free()ing it. */
1377  free(command);
1378  }
1379 }
1380 
1381 /*
1382  * Check for status messages from workers.
1383  *
1384  * If do_wait is true, wait to get a status message; otherwise, just return
1385  * immediately if there is none available.
1386  *
1387  * When we get a status message, we pass the status code to the callback
1388  * function that was specified to DispatchJobForTocEntry, then reset the
1389  * worker status to IDLE.
1390  *
1391  * Returns true if we collected a status message, else false.
1392  *
1393  * XXX is it worth checking for more than one status message per call?
1394  * It seems somewhat unlikely that multiple workers would finish at exactly
1395  * the same time.
1396  */
1397 static bool
1399 {
1400  int worker;
1401  char *msg;
1402 
1403  /* Try to collect a status message */
1404  msg = getMessageFromWorker(pstate, do_wait, &worker);
1405 
1406  if (!msg)
1407  {
1408  /* If do_wait is true, we must have detected EOF on some socket */
1409  if (do_wait)
1410  pg_fatal("a worker process died unexpectedly");
1411  return false;
1412  }
1413 
1414  /* Process it and update our idea of the worker's status */
1415  if (messageStartsWith(msg, "OK "))
1416  {
1417  ParallelSlot *slot = &pstate->parallelSlot[worker];
1418  TocEntry *te = pstate->te[worker];
1419  int status;
1420 
1421  status = parseWorkerResponse(AH, te, msg);
1422  slot->callback(AH, te, status, slot->callback_data);
1423  slot->workerStatus = WRKR_IDLE;
1424  pstate->te[worker] = NULL;
1425  }
1426  else
1427  pg_fatal("invalid message received from worker: \"%s\"",
1428  msg);
1429 
1430  /* Free the string returned from getMessageFromWorker */
1431  free(msg);
1432 
1433  return true;
1434 }
1435 
1436 /*
1437  * Check for status results from workers, waiting if necessary.
1438  *
1439  * Available wait modes are:
1440  * WFW_NO_WAIT: reap any available status, but don't block
1441  * WFW_GOT_STATUS: wait for at least one more worker to finish
1442  * WFW_ONE_IDLE: wait for at least one worker to be idle
1443  * WFW_ALL_IDLE: wait for all workers to be idle
1444  *
1445  * Any received results are passed to the callback specified to
1446  * DispatchJobForTocEntry.
1447  *
1448  * This function is executed in the leader process.
1449  */
1450 void
1452 {
1453  bool do_wait = false;
1454 
1455  /*
1456  * In GOT_STATUS mode, always block waiting for a message, since we can't
1457  * return till we get something. In other modes, we don't block the first
1458  * time through the loop.
1459  */
1460  if (mode == WFW_GOT_STATUS)
1461  {
1462  /* Assert that caller knows what it's doing */
1463  Assert(!IsEveryWorkerIdle(pstate));
1464  do_wait = true;
1465  }
1466 
1467  for (;;)
1468  {
1469  /*
1470  * Check for status messages, even if we don't need to block. We do
1471  * not try very hard to reap all available messages, though, since
1472  * there's unlikely to be more than one.
1473  */
1474  if (ListenToWorkers(AH, pstate, do_wait))
1475  {
1476  /*
1477  * If we got a message, we are done by definition for GOT_STATUS
1478  * mode, and we can also be certain that there's at least one idle
1479  * worker. So we're done in all but ALL_IDLE mode.
1480  */
1481  if (mode != WFW_ALL_IDLE)
1482  return;
1483  }
1484 
1485  /* Check whether we must wait for new status messages */
1486  switch (mode)
1487  {
1488  case WFW_NO_WAIT:
1489  return; /* never wait */
1490  case WFW_GOT_STATUS:
1491  Assert(false); /* can't get here, because we waited */
1492  break;
1493  case WFW_ONE_IDLE:
1494  if (GetIdleWorker(pstate) != NO_SLOT)
1495  return;
1496  break;
1497  case WFW_ALL_IDLE:
1498  if (IsEveryWorkerIdle(pstate))
1499  return;
1500  break;
1501  }
1502 
1503  /* Loop back, and this time wait for something to happen */
1504  do_wait = true;
1505  }
1506 }
1507 
1508 /*
1509  * Read one command message from the leader, blocking if necessary
1510  * until one is available, and return it as a malloc'd string.
1511  * On EOF, return NULL.
1512  *
1513  * This function is executed in worker processes.
1514  */
1515 static char *
1516 getMessageFromLeader(int pipefd[2])
1517 {
1518  return readMessageFromPipe(pipefd[PIPE_READ]);
1519 }
1520 
1521 /*
1522  * Send a status message to the leader.
1523  *
1524  * This function is executed in worker processes.
1525  */
1526 static void
1527 sendMessageToLeader(int pipefd[2], const char *str)
1528 {
1529  int len = strlen(str) + 1;
1530 
1531  if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1532  pg_fatal("could not write to the communication channel: %m");
1533 }
1534 
1535 /*
1536  * Wait until some descriptor in "workerset" becomes readable.
1537  * Returns -1 on error, else the number of readable descriptors.
1538  */
1539 static int
1540 select_loop(int maxFd, fd_set *workerset)
1541 {
1542  int i;
1543  fd_set saveSet = *workerset;
1544 
1545  for (;;)
1546  {
1547  *workerset = saveSet;
1548  i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1549 
1550 #ifndef WIN32
1551  if (i < 0 && errno == EINTR)
1552  continue;
1553 #else
1554  if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1555  continue;
1556 #endif
1557  break;
1558  }
1559 
1560  return i;
1561 }
1562 
1563 
1564 /*
1565  * Check for messages from worker processes.
1566  *
1567  * If a message is available, return it as a malloc'd string, and put the
1568  * index of the sending worker in *worker.
1569  *
1570  * If nothing is available, wait if "do_wait" is true, else return NULL.
1571  *
1572  * If we detect EOF on any socket, we'll return NULL. It's not great that
1573  * that's hard to distinguish from the no-data-available case, but for now
1574  * our one caller is okay with that.
1575  *
1576  * This function is executed in the leader process.
1577  */
1578 static char *
1579 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1580 {
1581  int i;
1582  fd_set workerset;
1583  int maxFd = -1;
1584  struct timeval nowait = {0, 0};
1585 
1586  /* construct bitmap of socket descriptors for select() */
1587  FD_ZERO(&workerset);
1588  for (i = 0; i < pstate->numWorkers; i++)
1589  {
1591  continue;
1592  FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1593  if (pstate->parallelSlot[i].pipeRead > maxFd)
1594  maxFd = pstate->parallelSlot[i].pipeRead;
1595  }
1596 
1597  if (do_wait)
1598  {
1599  i = select_loop(maxFd, &workerset);
1600  Assert(i != 0);
1601  }
1602  else
1603  {
1604  if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1605  return NULL;
1606  }
1607 
1608  if (i < 0)
1609  pg_fatal("%s() failed: %m", "select");
1610 
1611  for (i = 0; i < pstate->numWorkers; i++)
1612  {
1613  char *msg;
1614 
1616  continue;
1617  if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1618  continue;
1619 
1620  /*
1621  * Read the message if any. If the socket is ready because of EOF,
1622  * we'll return NULL instead (and the socket will stay ready, so the
1623  * condition will persist).
1624  *
1625  * Note: because this is a blocking read, we'll wait if only part of
1626  * the message is available. Waiting a long time would be bad, but
1627  * since worker status messages are short and are always sent in one
1628  * operation, it shouldn't be a problem in practice.
1629  */
1630  msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1631  *worker = i;
1632  return msg;
1633  }
1634  Assert(false);
1635  return NULL;
1636 }
1637 
1638 /*
1639  * Send a command message to the specified worker process.
1640  *
1641  * This function is executed in the leader process.
1642  */
1643 static void
1644 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1645 {
1646  int len = strlen(str) + 1;
1647 
1648  if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1649  {
1650  pg_fatal("could not write to the communication channel: %m");
1651  }
1652 }
1653 
1654 /*
1655  * Read one message from the specified pipe (fd), blocking if necessary
1656  * until one is available, and return it as a malloc'd string.
1657  * On EOF, return NULL.
1658  *
1659  * A "message" on the channel is just a null-terminated string.
1660  */
1661 static char *
1663 {
1664  char *msg;
1665  int msgsize,
1666  bufsize;
1667  int ret;
1668 
1669  /*
1670  * In theory, if we let piperead() read multiple bytes, it might give us
1671  * back fragments of multiple messages. (That can't actually occur, since
1672  * neither leader nor workers send more than one message without waiting
1673  * for a reply, but we don't wish to assume that here.) For simplicity,
1674  * read a byte at a time until we get the terminating '\0'. This method
1675  * is a bit inefficient, but since this is only used for relatively short
1676  * command and status strings, it shouldn't matter.
1677  */
1678  bufsize = 64; /* could be any number */
1679  msg = (char *) pg_malloc(bufsize);
1680  msgsize = 0;
1681  for (;;)
1682  {
1683  Assert(msgsize < bufsize);
1684  ret = piperead(fd, msg + msgsize, 1);
1685  if (ret <= 0)
1686  break; /* error or connection closure */
1687 
1688  Assert(ret == 1);
1689 
1690  if (msg[msgsize] == '\0')
1691  return msg; /* collected whole message */
1692 
1693  msgsize++;
1694  if (msgsize == bufsize) /* enlarge buffer if needed */
1695  {
1696  bufsize += 16; /* could be any number */
1697  msg = (char *) pg_realloc(msg, bufsize);
1698  }
1699  }
1700 
1701  /* Other end has closed the connection */
1702  pg_free(msg);
1703  return NULL;
1704 }
1705 
1706 #ifdef WIN32
1707 
1708 /*
1709  * This is a replacement version of pipe(2) for Windows which allows the pipe
1710  * handles to be used in select().
1711  *
1712  * Reads and writes on the pipe must go through piperead()/pipewrite().
1713  *
1714  * For consistency with Unix we declare the returned handles as "int".
1715  * This is okay even on WIN64 because system handles are not more than
1716  * 32 bits wide, but we do have to do some casting.
1717  */
1718 static int
1719 pgpipe(int handles[2])
1720 {
1721  pgsocket s,
1722  tmp_sock;
1723  struct sockaddr_in serv_addr;
1724  int len = sizeof(serv_addr);
1725 
1726  /* We have to use the Unix socket invalid file descriptor value here. */
1727  handles[0] = handles[1] = -1;
1728 
1729  /*
1730  * setup listen socket
1731  */
1732  if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1733  {
1734  pg_log_error("pgpipe: could not create socket: error code %d",
1735  WSAGetLastError());
1736  return -1;
1737  }
1738 
1739  memset(&serv_addr, 0, sizeof(serv_addr));
1740  serv_addr.sin_family = AF_INET;
1741  serv_addr.sin_port = pg_hton16(0);
1742  serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1743  if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1744  {
1745  pg_log_error("pgpipe: could not bind: error code %d",
1746  WSAGetLastError());
1747  closesocket(s);
1748  return -1;
1749  }
1750  if (listen(s, 1) == SOCKET_ERROR)
1751  {
1752  pg_log_error("pgpipe: could not listen: error code %d",
1753  WSAGetLastError());
1754  closesocket(s);
1755  return -1;
1756  }
1757  if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1758  {
1759  pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1760  WSAGetLastError());
1761  closesocket(s);
1762  return -1;
1763  }
1764 
1765  /*
1766  * setup pipe handles
1767  */
1768  if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1769  {
1770  pg_log_error("pgpipe: could not create second socket: error code %d",
1771  WSAGetLastError());
1772  closesocket(s);
1773  return -1;
1774  }
1775  handles[1] = (int) tmp_sock;
1776 
1777  if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1778  {
1779  pg_log_error("pgpipe: could not connect socket: error code %d",
1780  WSAGetLastError());
1781  closesocket(handles[1]);
1782  handles[1] = -1;
1783  closesocket(s);
1784  return -1;
1785  }
1786  if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1787  {
1788  pg_log_error("pgpipe: could not accept connection: error code %d",
1789  WSAGetLastError());
1790  closesocket(handles[1]);
1791  handles[1] = -1;
1792  closesocket(s);
1793  return -1;
1794  }
1795  handles[0] = (int) tmp_sock;
1796 
1797  closesocket(s);
1798  return 0;
1799 }
1800 
1801 #endif /* WIN32 */
struct WorkerInfoData * WorkerInfo
Definition: autovacuum.c:237
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition: parallel.c:1059
static void sendMessageToLeader(int pipefd[2], const char *str)
Definition: parallel.c:1527
struct DumpSignalInformation DumpSignalInformation
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
Definition: parallel.c:264
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
Definition: parallel.c:1336
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition: parallel.c:1451
T_WorkerStatus
Definition: parallel.c:76
@ WRKR_WORKING
Definition: parallel.c:79
@ WRKR_IDLE
Definition: parallel.c:78
@ WRKR_TERMINATED
Definition: parallel.c:80
@ WRKR_NOT_STARTED
Definition: parallel.c:77
static bool HasEveryWorkerTerminated(ParallelState *pstate)
Definition: parallel.c:1252
#define pgpipe(a)
Definition: parallel.c:137
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
Definition: parallel.c:1398
static void sigTermHandler(SIGNAL_ARGS)
Definition: parallel.c:545
#define PIPE_READ
Definition: parallel.c:69
static char * readMessageFromPipe(int fd)
Definition: parallel.c:1662
static int select_loop(int maxFd, fd_set *workerset)
Definition: parallel.c:1540
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
Definition: parallel.c:1171
static int GetIdleWorker(ParallelState *pstate)
Definition: parallel.c:1236
static void set_cancel_pstate(ParallelState *pstate)
Definition: parallel.c:789
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
Definition: parallel.c:829
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
Definition: parallel.c:809
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
Definition: parallel.c:1108
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
Definition: parallel.c:1579
static void archive_close_connection(int code, void *arg)
Definition: parallel.c:339
#define NO_SLOT
Definition: parallel.c:72
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
Definition: parallel.c:1644
#define PIPE_WRITE
Definition: parallel.c:70
static ShutdownInformation shutdown_info
Definition: parallel.c:152
void on_exit_close_archive(Archive *AHX)
Definition: parallel.c:328
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition: parallel.c:1205
#define WORKER_IS_RUNNING(workerStatus)
Definition: parallel.c:83
static char * getMessageFromLeader(int pipefd[2])
Definition: parallel.c:1516
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
Definition: parallel.c:1301
#define piperead(a, b, c)
Definition: parallel.c:138
struct ShutdownInformation ShutdownInformation
#define pipewrite(a, b, c)
Definition: parallel.c:139
void init_parallel_dump_utils(void)
Definition: parallel.c:236
static void set_cancel_handler(void)
Definition: parallel.c:608
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
Definition: parallel.c:1156
static volatile DumpSignalInformation signal_info
Definition: parallel.c:173
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition: parallel.c:1268
#define write_stderr(str)
Definition: parallel.c:184
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
Definition: parallel.c:1123
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition: parallel.c:897
#define messageStartsWith(msg, prefix)
Definition: parallel.c:226
static void ShutdownWorkersHard(ParallelState *pstate)
Definition: parallel.c:395
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition: parallel.c:446
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
Definition: parallel.c:730
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
Definition: parallel.h:24
WFW_WaitOption
Definition: parallel.h:31
@ WFW_ALL_IDLE
Definition: parallel.h:35
@ WFW_GOT_STATUS
Definition: parallel.h:33
@ WFW_NO_WAIT
Definition: parallel.h:32
@ WFW_ONE_IDLE
Definition: parallel.h:34
#define SIGNAL_ARGS
Definition: c.h:1332
void err(int eval, const char *fmt,...)
Definition: err.c:43
PGcancel * PQgetCancel(PGconn *conn)
Definition: fe-cancel.c:348
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition: fe-cancel.c:462
void PQfreeCancel(PGcancel *cancel)
Definition: fe-cancel.c:416
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3371
PGresult * PQexec(PGconn *conn, const char *query)
Definition: fe-exec.c:2224
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define free(a)
Definition: header.h:65
#define bufsize
Definition: indent_globs.h:36
int j
Definition: isn.c:74
int i
Definition: isn.c:73
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:100
static void const char fflush(stdout)
Assert(fmt[strlen(fmt) - 1] !='\n')
exit(1)
#define pg_log_error(...)
Definition: logging.h:106
const char * progname
Definition: main.c:44
int DumpId
Definition: pg_backup.h:268
void DisconnectDatabase(Archive *AHX)
Definition: pg_backup_db.c:225
void DeCloneArchive(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
#define WORKER_IGNORED_ERRORS
@ ACT_RESTORE
@ ACT_DUMP
void * arg
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
#define pg_fatal(...)
#define pg_hton32(x)
Definition: pg_bswap.h:121
#define pg_hton16(x)
Definition: pg_bswap.h:120
static PgChecksumMode mode
Definition: pg_checksums.c:56
const void size_t len
static bool do_wait
Definition: pg_ctl.c:74
static char * buf
Definition: pg_test_fsync.c:73
pqsigfunc pqsignal(int signo, pqsigfunc func)
int pgsocket
Definition: port.h:29
#define snprintf
Definition: port.h:238
#define PGINVALID_SOCKET
Definition: port.h:31
#define closesocket
Definition: port.h:349
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:146
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
PQExpBufferData * PQExpBuffer
Definition: pqexpbuffer.h:51
static int fd(const char *x, int i)
Definition: preproc-init.c:105
PGconn * conn
Definition: streamutil.c:54
const char * fmtQualifiedId(const char *schema, const char *id)
Definition: string_utils.c:145
PQExpBuffer(* getLocalPQExpBuffer)(void)
Definition: string_utils.c:27
int n_errors
Definition: pg_backup.h:236
int numWorkers
Definition: pg_backup.h:223
ArchiveHandle * myAH
Definition: parallel.c:165
ParallelState * pstate
Definition: parallel.c:166
ParallelCompletionPtr callback
Definition: parallel.c:98
ArchiveHandle * AH
Definition: parallel.c:101
void * callback_data
Definition: parallel.c:99
int pipeRead
Definition: parallel.c:103
T_WorkerStatus workerStatus
Definition: parallel.c:95
int pipeWrite
Definition: parallel.c:104
int pipeRevRead
Definition: parallel.c:105
int pipeRevWrite
Definition: parallel.c:106
pid_t pid
Definition: parallel.c:113
TocEntry ** te
Definition: parallel.h:59
ParallelSlot * parallelSlot
Definition: parallel.h:60
int numWorkers
Definition: parallel.h:57
ParallelState * pstate
Definition: parallel.c:148
WorkerJobDumpPtrType WorkerJobDumpPtr
PGcancel *volatile connCancel
WorkerJobRestorePtrType WorkerJobRestorePtr
SetupWorkerPtrType SetupWorkerPtr
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
Definition: test_ifaddrs.c:46
#define bind(s, addr, addrlen)
Definition: win32_port.h:491
#define EINTR
Definition: win32_port.h:374
#define SIGPIPE
Definition: win32_port.h:173
#define SIGQUIT
Definition: win32_port.h:169
#define kill(pid, sig)
Definition: win32_port.h:485
#define SIG_IGN
Definition: win32_port.h:165
#define socket(af, type, protocol)
Definition: win32_port.h:490
#define accept(s, addr, addrlen)
Definition: win32_port.h:493
#define connect(s, name, namelen)
Definition: win32_port.h:494
#define listen(s, backlog)
Definition: win32_port.h:492
#define select(n, r, w, e, timeout)
Definition: win32_port.h:495