PostgreSQL Source Code git master
Loading...
Searching...
No Matches
parallel.c File Reference
#include "postgres_fe.h"
#include <sys/select.h>
#include <sys/wait.h>
#include <signal.h>
#include <unistd.h>
#include <fcntl.h>
#include "fe_utils/string_utils.h"
#include "parallel.h"
#include "pg_backup_utils.h"
Include dependency graph for parallel.c:

Go to the source code of this file.

Data Structures

struct  ParallelSlot
 
struct  ShutdownInformation
 
struct  DumpSignalInformation
 

Macros

#define PIPE_READ   0
 
#define PIPE_WRITE   1
 
#define NO_SLOT   (-1) /* Failure result for GetIdleWorker() */
 
#define WORKER_IS_RUNNING(workerStatus)    ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
 
#define pgpipe(a)   pipe(a)
 
#define piperead(a, b, c)   read(a,b,c)
 
#define pipewrite(a, b, c)   write(a,b,c)
 
#define write_stderr(str)
 
#define messageStartsWith(msg, prefix)    (strncmp(msg, prefix, strlen(prefix)) == 0)
 

Typedefs

typedef struct ShutdownInformation ShutdownInformation
 
typedef struct DumpSignalInformation DumpSignalInformation
 

Enumerations

enum  T_WorkerStatus { WRKR_NOT_STARTED = 0 , WRKR_IDLE , WRKR_WORKING , WRKR_TERMINATED }
 

Functions

static ParallelSlotGetMyPSlot (ParallelState *pstate)
 
static void archive_close_connection (int code, void *arg)
 
static void ShutdownWorkersHard (ParallelState *pstate)
 
static void WaitForTerminatingWorkers (ParallelState *pstate)
 
static void set_cancel_handler (void)
 
static void set_cancel_pstate (ParallelState *pstate)
 
static void set_cancel_slot_archive (ParallelSlot *slot, ArchiveHandle *AH)
 
static void RunWorker (ArchiveHandle *AH, ParallelSlot *slot)
 
static int GetIdleWorker (ParallelState *pstate)
 
static bool HasEveryWorkerTerminated (ParallelState *pstate)
 
static void lockTableForWorker (ArchiveHandle *AH, TocEntry *te)
 
static void WaitForCommands (ArchiveHandle *AH, int pipefd[2])
 
static bool ListenToWorkers (ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
 
static chargetMessageFromLeader (int pipefd[2])
 
static void sendMessageToLeader (int pipefd[2], const char *str)
 
static int select_loop (int maxFd, fd_set *workerset)
 
static chargetMessageFromWorker (ParallelState *pstate, bool do_wait, int *worker)
 
static void sendMessageToWorker (ParallelState *pstate, int worker, const char *str)
 
static charreadMessageFromPipe (int fd)
 
void init_parallel_dump_utils (void)
 
void on_exit_close_archive (Archive *AHX)
 
static void sigTermHandler (SIGNAL_ARGS)
 
void set_archive_cancel_info (ArchiveHandle *AH, PGconn *conn)
 
ParallelStateParallelBackupStart (ArchiveHandle *AH)
 
void ParallelBackupEnd (ArchiveHandle *AH, ParallelState *pstate)
 
static void buildWorkerCommand (ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
 
static void parseWorkerCommand (ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
 
static void buildWorkerResponse (ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
 
static int parseWorkerResponse (ArchiveHandle *AH, TocEntry *te, const char *msg)
 
void DispatchJobForTocEntry (ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
 
bool IsEveryWorkerIdle (ParallelState *pstate)
 
void WaitForWorkers (ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
 

Variables

static ShutdownInformation shutdown_info
 
static volatile DumpSignalInformation signal_info
 

Macro Definition Documentation

◆ messageStartsWith

#define messageStartsWith (   msg,
  prefix 
)     (strncmp(msg, prefix, strlen(prefix)) == 0)

Definition at line 228 of file parallel.c.

238{
239#ifdef WIN32
241 {
243 int err;
244
245 /* Prepare for threaded operation */
248
249 /* Initialize socket access */
250 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
251 if (err != 0)
252 pg_fatal("%s() failed: error code %d", "WSAStartup", err);
253
254 parallel_init_done = true;
255 }
256#endif
257}
258
259/*
260 * Find the ParallelSlot for the current worker process or thread.
261 *
262 * Returns NULL if no matching slot is found (this implies we're the leader).
263 */
264static ParallelSlot *
266{
267 int i;
268
269 for (i = 0; i < pstate->numWorkers; i++)
270 {
271#ifdef WIN32
272 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
273#else
274 if (pstate->parallelSlot[i].pid == getpid())
275#endif
276 return &(pstate->parallelSlot[i]);
277 }
278
279 return NULL;
280}
281
282/*
283 * A thread-local version of getLocalPQExpBuffer().
284 *
285 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
286 * thread, which is much better than one per fmtId/fmtQualifiedId call.
287 */
288#ifdef WIN32
289static PQExpBuffer
291{
292 /*
293 * The Tls code goes awry if we use a static var, so we provide for both
294 * static and auto, and omit any use of the static var when using Tls. We
295 * rely on TlsGetValue() to return 0 if the value is not yet set.
296 */
299
302 else
304
305 if (id_return) /* first time through? */
306 {
307 /* same buffer, just wipe contents */
309 }
310 else
311 {
312 /* new buffer */
316 else
318 }
319
320 return id_return;
321}
322#endif /* WIN32 */
323
324/*
325 * pg_dump and pg_restore call this to register the cleanup handler
326 * as soon as they've created the ArchiveHandle.
327 */
328void
330{
331 shutdown_info.AHX = AHX;
333}
334
335/*
336 * on_exit_nicely handler for shutting down database connections and
337 * worker processes cleanly.
338 */
339static void
340archive_close_connection(int code, void *arg)
341{
343
344 if (si->pstate)
345 {
346 /* In parallel mode, must figure out who we are */
347 ParallelSlot *slot = GetMyPSlot(si->pstate);
348
349 if (!slot)
350 {
351 /*
352 * We're the leader. Forcibly shut down workers, then close our
353 * own database connection, if any.
354 */
355 ShutdownWorkersHard(si->pstate);
356
357 if (si->AHX)
359 }
360 else
361 {
362 /*
363 * We're a worker. Shut down our own DB connection if any. On
364 * Windows, we also have to close our communication sockets, to
365 * emulate what will happen on Unix when the worker process exits.
366 * (Without this, if this is a premature exit, the leader would
367 * fail to detect it because there would be no EOF condition on
368 * the other end of the pipe.)
369 */
370 if (slot->AH)
371 DisconnectDatabase(&(slot->AH->public));
372
373#ifdef WIN32
376#endif
377 }
378 }
379 else
380 {
381 /* Non-parallel operation: just kill the leader DB connection */
382 if (si->AHX)
384 }
385}
386
387/*
388 * Forcibly shut down any remaining workers, waiting for them to finish.
389 *
390 * Note that we don't expect to come here during normal exit (the workers
391 * should be long gone, and the ParallelState too). We're only here in a
392 * pg_fatal() situation, so intervening to cancel active commands is
393 * appropriate.
394 */
395static void
397{
398 int i;
399
400 /*
401 * Close our write end of the sockets so that any workers waiting for
402 * commands know they can exit. (Note: some of the pipeWrite fields might
403 * still be zero, if we failed to initialize all the workers. Hence, just
404 * ignore errors here.)
405 */
406 for (i = 0; i < pstate->numWorkers; i++)
408
409 /*
410 * Force early termination of any commands currently in progress.
411 */
412#ifndef WIN32
413 /* On non-Windows, send SIGTERM to each worker process. */
414 for (i = 0; i < pstate->numWorkers; i++)
415 {
416 pid_t pid = pstate->parallelSlot[i].pid;
417
418 if (pid != 0)
419 kill(pid, SIGTERM);
420 }
421#else
422
423 /*
424 * On Windows, send query cancels directly to the workers' backends. Use
425 * a critical section to ensure worker threads don't change state.
426 */
428 for (i = 0; i < pstate->numWorkers; i++)
429 {
430 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
431 char errbuf[1];
432
433 if (AH != NULL && AH->connCancel != NULL)
434 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
435 }
437#endif
438
439 /* Now wait for them to terminate. */
441}
442
443/*
444 * Wait for all workers to terminate.
445 */
446static void
448{
449 while (!HasEveryWorkerTerminated(pstate))
450 {
451 ParallelSlot *slot = NULL;
452 int j;
453
454#ifndef WIN32
455 /* On non-Windows, use wait() to wait for next worker to end */
456 int status;
457 pid_t pid = wait(&status);
458
459 /* Find dead worker's slot, and clear the PID field */
460 for (j = 0; j < pstate->numWorkers; j++)
461 {
462 slot = &(pstate->parallelSlot[j]);
463 if (slot->pid == pid)
464 {
465 slot->pid = 0;
466 break;
467 }
468 }
469#else /* WIN32 */
470 /* On Windows, we must use WaitForMultipleObjects() */
471 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
472 int nrun = 0;
473 DWORD ret;
475
476 for (j = 0; j < pstate->numWorkers; j++)
477 {
479 {
480 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
481 nrun++;
482 }
483 }
485 Assert(ret != WAIT_FAILED);
488
489 /* Find dead worker's slot, and clear the hThread field */
490 for (j = 0; j < pstate->numWorkers; j++)
491 {
492 slot = &(pstate->parallelSlot[j]);
493 if (slot->hThread == hThread)
494 {
495 /* For cleanliness, close handles for dead threads */
496 CloseHandle((HANDLE) slot->hThread);
497 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
498 break;
499 }
500 }
501#endif /* WIN32 */
502
503 /* On all platforms, update workerStatus and te[] as well */
504 Assert(j < pstate->numWorkers);
506 pstate->te[j] = NULL;
507 }
508}
509
510
511/*
512 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
513 *
514 * This doesn't quite belong in this module, but it needs access to the
515 * ParallelState data, so there's not really a better place either.
516 *
517 * When we get a cancel interrupt, we could just die, but in pg_restore that
518 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
519 * for a long time. Instead, we try to send a cancel request and then die.
520 * pg_dump probably doesn't really need this, but we might as well use it
521 * there too. Note that sending the cancel directly from the signal handler
522 * is safe because PQcancel() is written to make it so.
523 *
524 * In parallel operation on Unix, each process is responsible for canceling
525 * its own connection (this must be so because nobody else has access to it).
526 * Furthermore, the leader process should attempt to forward its signal to
527 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
528 * needed because typing control-C at the console would deliver SIGINT to
529 * every member of the terminal process group --- but in other scenarios it
530 * might be that only the leader gets signaled.
531 *
532 * On Windows, the cancel handler runs in a separate thread, because that's
533 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
534 * cancels on all active connections, and then return FALSE, which will allow
535 * the process to die. For safety's sake, we use a critical section to
536 * protect the PGcancel structures against being changed while the signal
537 * thread runs.
538 */
539
540#ifndef WIN32
541
542/*
543 * Signal handler (Unix only)
544 */
545static void
547{
548 int i;
549 char errbuf[1];
550
551 /*
552 * Some platforms allow delivery of new signals to interrupt an active
553 * signal handler. That could muck up our attempt to send PQcancel, so
554 * disable the signals that set_cancel_handler enabled.
555 */
559
560 /*
561 * If we're in the leader, forward signal to all workers. (It seems best
562 * to do this before PQcancel; killing the leader transaction will result
563 * in invalid-snapshot errors from active workers, which maybe we can
564 * quiet by killing workers first.) Ignore any errors.
565 */
566 if (signal_info.pstate != NULL)
567 {
568 for (i = 0; i < signal_info.pstate->numWorkers; i++)
569 {
571
572 if (pid != 0)
573 kill(pid, SIGTERM);
574 }
575 }
576
577 /*
578 * Send QueryCancel if we have a connection to send to. Ignore errors,
579 * there's not much we can do about them anyway.
580 */
582 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
583
584 /*
585 * Report we're quitting, using nothing more complicated than write(2).
586 * When in parallel operation, only the leader process should do this.
587 */
589 {
590 if (progname)
591 {
593 write_stderr(": ");
594 }
595 write_stderr("terminated by user\n");
596 }
597
598 /*
599 * And die, using _exit() not exit() because the latter will invoke atexit
600 * handlers that can fail if we interrupted related code.
601 */
602 _exit(1);
603}
604
605/*
606 * Enable cancel interrupt handler, if not already done.
607 */
608static void
610{
611 /*
612 * When forking, signal_info.handler_set will propagate into the new
613 * process, but that's fine because the signal handler state does too.
614 */
616 {
618
622 }
623}
624
625#else /* WIN32 */
626
627/*
628 * Console interrupt handler --- runs in a newly-started thread.
629 *
630 * After stopping other threads and sending cancel requests on all open
631 * connections, we return FALSE which will allow the default ExitProcess()
632 * action to be taken.
633 */
634static BOOL WINAPI
636{
637 int i;
638 char errbuf[1];
639
640 if (dwCtrlType == CTRL_C_EVENT ||
642 {
643 /* Critical section prevents changing data we look at here */
645
646 /*
647 * If in parallel mode, stop worker threads and send QueryCancel to
648 * their connected backends. The main point of stopping the worker
649 * threads is to keep them from reporting the query cancels as errors,
650 * which would clutter the user's screen. We needn't stop the leader
651 * thread since it won't be doing much anyway. Do this before
652 * canceling the main transaction, else we might get invalid-snapshot
653 * errors reported before we can stop the workers. Ignore errors,
654 * there's not much we can do about them anyway.
655 */
656 if (signal_info.pstate != NULL)
657 {
658 for (i = 0; i < signal_info.pstate->numWorkers; i++)
659 {
661 ArchiveHandle *AH = slot->AH;
662 HANDLE hThread = (HANDLE) slot->hThread;
663
664 /*
665 * Using TerminateThread here may leave some resources leaked,
666 * but it doesn't matter since we're about to end the whole
667 * process.
668 */
671
672 if (AH != NULL && AH->connCancel != NULL)
673 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
674 }
675 }
676
677 /*
678 * Send QueryCancel to leader connection, if enabled. Ignore errors,
679 * there's not much we can do about them anyway.
680 */
683 errbuf, sizeof(errbuf));
684
686
687 /*
688 * Report we're quitting, using nothing more complicated than
689 * write(2). (We might be able to get away with using pg_log_*()
690 * here, but since we terminated other threads uncleanly above, it
691 * seems better to assume as little as possible.)
692 */
693 if (progname)
694 {
696 write_stderr(": ");
697 }
698 write_stderr("terminated by user\n");
699 }
700
701 /* Always return FALSE to allow signal handling to continue */
702 return FALSE;
703}
704
705/*
706 * Enable cancel interrupt handler, if not already done.
707 */
708static void
710{
712 {
714
716
718 }
719}
720
721#endif /* WIN32 */
722
723
724/*
725 * set_archive_cancel_info
726 *
727 * Fill AH->connCancel with cancellation info for the specified database
728 * connection; or clear it if conn is NULL.
729 */
730void
732{
734
735 /*
736 * Activate the interrupt handler if we didn't yet in this process. On
737 * Windows, this also initializes signal_info_lock; therefore it's
738 * important that this happen at least once before we fork off any
739 * threads.
740 */
742
743 /*
744 * On Unix, we assume that storing a pointer value is atomic with respect
745 * to any possible signal interrupt. On Windows, use a critical section.
746 */
747
748#ifdef WIN32
750#endif
751
752 /* Free the old one if we have one */
754 /* be sure interrupt handler doesn't use pointer while freeing */
755 AH->connCancel = NULL;
756
757 if (oldConnCancel != NULL)
759
760 /* Set the new one if specified */
761 if (conn)
763
764 /*
765 * On Unix, there's only ever one active ArchiveHandle per process, so we
766 * can just set signal_info.myAH unconditionally. On Windows, do that
767 * only in the main thread; worker threads have to make sure their
768 * ArchiveHandle appears in the pstate data, which is dealt with in
769 * RunWorker().
770 */
771#ifndef WIN32
772 signal_info.myAH = AH;
773#else
775 signal_info.myAH = AH;
776#endif
777
778#ifdef WIN32
780#endif
781}
782
783/*
784 * set_cancel_pstate
785 *
786 * Set signal_info.pstate to point to the specified ParallelState, if any.
787 * We need this mainly to have an interlock against Windows signal thread.
788 */
789static void
791{
792#ifdef WIN32
794#endif
795
796 signal_info.pstate = pstate;
797
798#ifdef WIN32
800#endif
801}
802
803/*
804 * set_cancel_slot_archive
805 *
806 * Set ParallelSlot's AH field to point to the specified archive, if any.
807 * We need this mainly to have an interlock against Windows signal thread.
808 */
809static void
811{
812#ifdef WIN32
814#endif
815
816 slot->AH = AH;
817
818#ifdef WIN32
820#endif
821}
822
823
824/*
825 * This function is called by both Unix and Windows variants to set up
826 * and run a worker process. Caller should exit the process (or thread)
827 * upon return.
828 */
829static void
831{
832 int pipefd[2];
833
834 /* fetch child ends of pipes */
837
838 /*
839 * Clone the archive so that we have our own state to work with, and in
840 * particular our own database connection.
841 *
842 * We clone on Unix as well as Windows, even though technically we don't
843 * need to because fork() gives us a copy in our own address space
844 * already. But CloneArchive resets the state information and also clones
845 * the database connection which both seem kinda helpful.
846 */
847 AH = CloneArchive(AH);
848
849 /* Remember cloned archive where signal handler can find it */
850 set_cancel_slot_archive(slot, AH);
851
852 /*
853 * Call the setup worker function that's defined in the ArchiveHandle.
854 */
855 (AH->SetupWorkerPtr) ((Archive *) AH);
856
857 /*
858 * Execute commands until done.
859 */
861
862 /*
863 * Disconnect from database and clean up.
864 */
867 DeCloneArchive(AH);
868}
869
870/*
871 * Thread base function for Windows
872 */
873#ifdef WIN32
874static unsigned __stdcall
876{
877 ArchiveHandle *AH = wi->AH;
878 ParallelSlot *slot = wi->slot;
879
880 /* Don't need WorkerInfo anymore */
881 free(wi);
882
883 /* Run the worker ... */
884 RunWorker(AH, slot);
885
886 /* Exit the thread */
887 _endthreadex(0);
888 return 0;
889}
890#endif /* WIN32 */
891
892/*
893 * This function starts a parallel dump or restore by spawning off the worker
894 * processes. For Windows, it creates a number of threads; on Unix the
895 * workers are created with fork().
896 */
899{
900 ParallelState *pstate;
901 int i;
902
903 Assert(AH->public.numWorkers > 0);
904
905 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
906
907 pstate->numWorkers = AH->public.numWorkers;
908 pstate->te = NULL;
909 pstate->parallelSlot = NULL;
910
911 if (AH->public.numWorkers == 1)
912 return pstate;
913
914 /* Create status arrays, being sure to initialize all fields to 0 */
915 pstate->te = (TocEntry **)
916 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
917 pstate->parallelSlot = (ParallelSlot *)
918 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
919
920#ifdef WIN32
921 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
923#endif
924
925 /*
926 * Set the pstate in shutdown_info, to tell the exit handler that it must
927 * clean up workers as well as the main database connection. But we don't
928 * set this in signal_info yet, because we don't want child processes to
929 * inherit non-NULL signal_info.pstate.
930 */
931 shutdown_info.pstate = pstate;
932
933 /*
934 * Temporarily disable query cancellation on the leader connection. This
935 * ensures that child processes won't inherit valid AH->connCancel
936 * settings and thus won't try to issue cancels against the leader's
937 * connection. No harm is done if we fail while it's disabled, because
938 * the leader connection is idle at this point anyway.
939 */
941
942 /* Ensure stdio state is quiesced before forking */
943 fflush(NULL);
944
945 /* Create desired number of workers */
946 for (i = 0; i < pstate->numWorkers; i++)
947 {
948#ifdef WIN32
949 WorkerInfo *wi;
950 uintptr_t handle;
951#else
952 pid_t pid;
953#endif
954 ParallelSlot *slot = &(pstate->parallelSlot[i]);
955 int pipeMW[2],
956 pipeWM[2];
957
958 /* Create communication pipes for this worker */
959 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
960 pg_fatal("could not create communication channels: %m");
961
962 /* leader's ends of the pipes */
963 slot->pipeRead = pipeWM[PIPE_READ];
964 slot->pipeWrite = pipeMW[PIPE_WRITE];
965 /* child's ends of the pipes */
968
969#ifdef WIN32
970 /* Create transient structure to pass args to worker function */
971 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
972
973 wi->AH = AH;
974 wi->slot = slot;
975
976 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
977 wi, 0, &(slot->threadId));
978 slot->hThread = handle;
979 slot->workerStatus = WRKR_IDLE;
980#else /* !WIN32 */
981 pid = fork();
982 if (pid == 0)
983 {
984 /* we are the worker */
985 int j;
986
987 /* this is needed for GetMyPSlot() */
988 slot->pid = getpid();
989
990 /* instruct signal handler that we're in a worker now */
991 signal_info.am_worker = true;
992
993 /* close read end of Worker -> Leader */
995 /* close write end of Leader -> Worker */
997
998 /*
999 * Close all inherited fds for communication of the leader with
1000 * previously-forked workers.
1001 */
1002 for (j = 0; j < i; j++)
1003 {
1006 }
1007
1008 /* Run the worker ... */
1009 RunWorker(AH, slot);
1010
1011 /* We can just exit(0) when done */
1012 exit(0);
1013 }
1014 else if (pid < 0)
1015 {
1016 /* fork failed */
1017 pg_fatal("could not create worker process: %m");
1018 }
1019
1020 /* In Leader after successful fork */
1021 slot->pid = pid;
1022 slot->workerStatus = WRKR_IDLE;
1023
1024 /* close read end of Leader -> Worker */
1026 /* close write end of Worker -> Leader */
1028#endif /* WIN32 */
1029 }
1030
1031 /*
1032 * Having forked off the workers, disable SIGPIPE so that leader isn't
1033 * killed if it tries to send a command to a dead worker. We don't want
1034 * the workers to inherit this setting, though.
1035 */
1036#ifndef WIN32
1038#endif
1039
1040 /*
1041 * Re-establish query cancellation on the leader connection.
1042 */
1044
1045 /*
1046 * Tell the cancel signal handler to forward signals to worker processes,
1047 * too. (As with query cancel, we did not need this earlier because the
1048 * workers have not yet been given anything to do; if we die before this
1049 * point, any already-started workers will see EOF and quit promptly.)
1050 */
1051 set_cancel_pstate(pstate);
1052
1053 return pstate;
1054}
1055
1056/*
1057 * Close down a parallel dump or restore.
1058 */
1059void
1061{
1062 int i;
1063
1064 /* No work if non-parallel */
1065 if (pstate->numWorkers == 1)
1066 return;
1067
1068 /* There should not be any unfinished jobs */
1069 Assert(IsEveryWorkerIdle(pstate));
1070
1071 /* Close the sockets so that the workers know they can exit */
1072 for (i = 0; i < pstate->numWorkers; i++)
1073 {
1076 }
1077
1078 /* Wait for them to exit */
1080
1081 /*
1082 * Unlink pstate from shutdown_info, so the exit handler will not try to
1083 * use it; and likewise unlink from signal_info.
1084 */
1087
1088 /* Release state (mere neatnik-ism, since we're about to terminate) */
1089 free(pstate->te);
1090 free(pstate->parallelSlot);
1091 free(pstate);
1092}
1093
1094/*
1095 * These next four functions handle construction and parsing of the command
1096 * strings and response strings for parallel workers.
1097 *
1098 * Currently, these can be the same regardless of which archive format we are
1099 * processing. In future, we might want to let format modules override these
1100 * functions to add format-specific data to a command or response.
1101 */
1102
1103/*
1104 * buildWorkerCommand: format a command string to send to a worker.
1105 *
1106 * The string is built in the caller-supplied buffer of size buflen.
1107 */
1108static void
1110 char *buf, int buflen)
1111{
1112 if (act == ACT_DUMP)
1113 snprintf(buf, buflen, "DUMP %d", te->dumpId);
1114 else if (act == ACT_RESTORE)
1115 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1116 else
1117 Assert(false);
1118}
1119
1120/*
1121 * parseWorkerCommand: interpret a command string in a worker.
1122 */
1123static void
1125 const char *msg)
1126{
1127 DumpId dumpId;
1128 int nBytes;
1129
1130 if (messageStartsWith(msg, "DUMP "))
1131 {
1132 *act = ACT_DUMP;
1133 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1134 Assert(nBytes == strlen(msg));
1135 *te = getTocEntryByDumpId(AH, dumpId);
1136 Assert(*te != NULL);
1137 }
1138 else if (messageStartsWith(msg, "RESTORE "))
1139 {
1140 *act = ACT_RESTORE;
1141 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1142 Assert(nBytes == strlen(msg));
1143 *te = getTocEntryByDumpId(AH, dumpId);
1144 Assert(*te != NULL);
1145 }
1146 else
1147 pg_fatal("unrecognized command received from leader: \"%s\"",
1148 msg);
1149}
1150
1151/*
1152 * buildWorkerResponse: format a response string to send to the leader.
1153 *
1154 * The string is built in the caller-supplied buffer of size buflen.
1155 */
1156static void
1158 char *buf, int buflen)
1159{
1160 snprintf(buf, buflen, "OK %d %d %d",
1161 te->dumpId,
1162 status,
1163 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1164}
1165
1166/*
1167 * parseWorkerResponse: parse the status message returned by a worker.
1168 *
1169 * Returns the integer status code, and may update fields of AH and/or te.
1170 */
1171static int
1173 const char *msg)
1174{
1175 DumpId dumpId;
1176 int nBytes,
1177 n_errors;
1178 int status = 0;
1179
1180 if (messageStartsWith(msg, "OK "))
1181 {
1182 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1183
1184 Assert(dumpId == te->dumpId);
1185 Assert(nBytes == strlen(msg));
1186
1187 AH->public.n_errors += n_errors;
1188 }
1189 else
1190 pg_fatal("invalid message received from worker: \"%s\"",
1191 msg);
1192
1193 return status;
1194}
1195
1196/*
1197 * Dispatch a job to some free worker.
1198 *
1199 * te is the TocEntry to be processed, act is the action to be taken on it.
1200 * callback is the function to call on completion of the job.
1201 *
1202 * If no worker is currently available, this will block, and previously
1203 * registered callback functions may be called.
1204 */
1205void
1207 ParallelState *pstate,
1208 TocEntry *te,
1209 T_Action act,
1211 void *callback_data)
1212{
1213 int worker;
1214 char buf[256];
1215
1216 /* Get a worker, waiting if none are idle */
1217 while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1218 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1219
1220 /* Construct and send command string */
1221 buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1222
1223 sendMessageToWorker(pstate, worker, buf);
1224
1225 /* Remember worker is busy, and which TocEntry it's working on */
1226 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1227 pstate->parallelSlot[worker].callback = callback;
1228 pstate->parallelSlot[worker].callback_data = callback_data;
1229 pstate->te[worker] = te;
1230}
1231
1232/*
1233 * Find an idle worker and return its slot number.
1234 * Return NO_SLOT if none are idle.
1235 */
1236static int
1238{
1239 int i;
1240
1241 for (i = 0; i < pstate->numWorkers; i++)
1242 {
1243 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1244 return i;
1245 }
1246 return NO_SLOT;
1247}
1248
1249/*
1250 * Return true iff no worker is running.
1251 */
1252static bool
1254{
1255 int i;
1256
1257 for (i = 0; i < pstate->numWorkers; i++)
1258 {
1260 return false;
1261 }
1262 return true;
1263}
1264
1265/*
1266 * Return true iff every worker is in the WRKR_IDLE state.
1267 */
1268bool
1270{
1271 int i;
1272
1273 for (i = 0; i < pstate->numWorkers; i++)
1274 {
1275 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1276 return false;
1277 }
1278 return true;
1279}
1280
1281/*
1282 * Acquire lock on a table to be dumped by a worker process.
1283 *
1284 * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1285 * it's no problem for a worker to get one too, but if anything else besides
1286 * pg_dump is running, there's a possible deadlock:
1287 *
1288 * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1289 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1290 * because the leader holds a conflicting ACCESS SHARE lock).
1291 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1292 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1293 * 4) Now we have a deadlock, since the leader is effectively waiting for
1294 * the worker. The server cannot detect that, however.
1295 *
1296 * To prevent an infinite wait, prior to touching a table in a worker, request
1297 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1298 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1299 * so we have a deadlock. We must fail the backup in that case.
1300 */
1301static void
1303{
1304 const char *qualId;
1305 PQExpBuffer query;
1306 PGresult *res;
1307
1308 /* Nothing to do for BLOBS */
1309 if (strcmp(te->desc, "BLOBS") == 0)
1310 return;
1311
1312 query = createPQExpBuffer();
1313
1314 qualId = fmtQualifiedId(te->namespace, te->tag);
1315
1316 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1317 qualId);
1318
1319 res = PQexec(AH->connection, query->data);
1320
1321 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1322 pg_fatal("could not obtain lock on relation \"%s\"\n"
1323 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1324 "on the table after the pg_dump parent process had gotten the "
1325 "initial ACCESS SHARE lock on the table.", qualId);
1326
1327 PQclear(res);
1328 destroyPQExpBuffer(query);
1329}
1330
1331/*
1332 * WaitForCommands: main routine for a worker process.
1333 *
1334 * Read and execute commands from the leader until we see EOF on the pipe.
1335 */
1336static void
1338{
1339 char *command;
1340 TocEntry *te;
1341 T_Action act;
1342 int status = 0;
1343 char buf[256];
1344
1345 for (;;)
1346 {
1347 if (!(command = getMessageFromLeader(pipefd)))
1348 {
1349 /* EOF, so done */
1350 return;
1351 }
1352
1353 /* Decode the command */
1354 parseWorkerCommand(AH, &te, &act, command);
1355
1356 if (act == ACT_DUMP)
1357 {
1358 /* Acquire lock on this table within the worker's session */
1359 lockTableForWorker(AH, te);
1360
1361 /* Perform the dump command */
1362 status = (AH->WorkerJobDumpPtr) (AH, te);
1363 }
1364 else if (act == ACT_RESTORE)
1365 {
1366 /* Perform the restore command */
1367 status = (AH->WorkerJobRestorePtr) (AH, te);
1368 }
1369 else
1370 Assert(false);
1371
1372 /* Return status to leader */
1373 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1374
1376
1377 /* command was pg_malloc'd and we are responsible for free()ing it. */
1378 free(command);
1379 }
1380}
1381
1382/*
1383 * Check for status messages from workers.
1384 *
1385 * If do_wait is true, wait to get a status message; otherwise, just return
1386 * immediately if there is none available.
1387 *
1388 * When we get a status message, we pass the status code to the callback
1389 * function that was specified to DispatchJobForTocEntry, then reset the
1390 * worker status to IDLE.
1391 *
1392 * Returns true if we collected a status message, else false.
1393 *
1394 * XXX is it worth checking for more than one status message per call?
1395 * It seems somewhat unlikely that multiple workers would finish at exactly
1396 * the same time.
1397 */
1398static bool
1400{
1401 int worker;
1402 char *msg;
1403
1404 /* Try to collect a status message */
1405 msg = getMessageFromWorker(pstate, do_wait, &worker);
1406
1407 if (!msg)
1408 {
1409 /* If do_wait is true, we must have detected EOF on some socket */
1410 if (do_wait)
1411 pg_fatal("a worker process died unexpectedly");
1412 return false;
1413 }
1414
1415 /* Process it and update our idea of the worker's status */
1416 if (messageStartsWith(msg, "OK "))
1417 {
1418 ParallelSlot *slot = &pstate->parallelSlot[worker];
1419 TocEntry *te = pstate->te[worker];
1420 int status;
1421
1422 status = parseWorkerResponse(AH, te, msg);
1423 slot->callback(AH, te, status, slot->callback_data);
1424 slot->workerStatus = WRKR_IDLE;
1425 pstate->te[worker] = NULL;
1426 }
1427 else
1428 pg_fatal("invalid message received from worker: \"%s\"",
1429 msg);
1430
1431 /* Free the string returned from getMessageFromWorker */
1432 free(msg);
1433
1434 return true;
1435}
1436
1437/*
1438 * Check for status results from workers, waiting if necessary.
1439 *
1440 * Available wait modes are:
1441 * WFW_NO_WAIT: reap any available status, but don't block
1442 * WFW_GOT_STATUS: wait for at least one more worker to finish
1443 * WFW_ONE_IDLE: wait for at least one worker to be idle
1444 * WFW_ALL_IDLE: wait for all workers to be idle
1445 *
1446 * Any received results are passed to the callback specified to
1447 * DispatchJobForTocEntry.
1448 *
1449 * This function is executed in the leader process.
1450 */
1451void
1453{
1454 bool do_wait = false;
1455
1456 /*
1457 * In GOT_STATUS mode, always block waiting for a message, since we can't
1458 * return till we get something. In other modes, we don't block the first
1459 * time through the loop.
1460 */
1461 if (mode == WFW_GOT_STATUS)
1462 {
1463 /* Assert that caller knows what it's doing */
1464 Assert(!IsEveryWorkerIdle(pstate));
1465 do_wait = true;
1466 }
1467
1468 for (;;)
1469 {
1470 /*
1471 * Check for status messages, even if we don't need to block. We do
1472 * not try very hard to reap all available messages, though, since
1473 * there's unlikely to be more than one.
1474 */
1475 if (ListenToWorkers(AH, pstate, do_wait))
1476 {
1477 /*
1478 * If we got a message, we are done by definition for GOT_STATUS
1479 * mode, and we can also be certain that there's at least one idle
1480 * worker. So we're done in all but ALL_IDLE mode.
1481 */
1482 if (mode != WFW_ALL_IDLE)
1483 return;
1484 }
1485
1486 /* Check whether we must wait for new status messages */
1487 switch (mode)
1488 {
1489 case WFW_NO_WAIT:
1490 return; /* never wait */
1491 case WFW_GOT_STATUS:
1492 Assert(false); /* can't get here, because we waited */
1493 break;
1494 case WFW_ONE_IDLE:
1495 if (GetIdleWorker(pstate) != NO_SLOT)
1496 return;
1497 break;
1498 case WFW_ALL_IDLE:
1499 if (IsEveryWorkerIdle(pstate))
1500 return;
1501 break;
1502 }
1503
1504 /* Loop back, and this time wait for something to happen */
1505 do_wait = true;
1506 }
1507}
1508
1509/*
1510 * Read one command message from the leader, blocking if necessary
1511 * until one is available, and return it as a malloc'd string.
1512 * On EOF, return NULL.
1513 *
1514 * This function is executed in worker processes.
1515 */
1516static char *
1518{
1520}
1521
1522/*
1523 * Send a status message to the leader.
1524 *
1525 * This function is executed in worker processes.
1526 */
1527static void
1528sendMessageToLeader(int pipefd[2], const char *str)
1529{
1530 int len = strlen(str) + 1;
1531
1532 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1533 pg_fatal("could not write to the communication channel: %m");
1534}
1535
1536/*
1537 * Wait until some descriptor in "workerset" becomes readable.
1538 * Returns -1 on error, else the number of readable descriptors.
1539 */
1540static int
1542{
1543 int i;
1545
1546 for (;;)
1547 {
1548 *workerset = saveSet;
1549 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1550
1551#ifndef WIN32
1552 if (i < 0 && errno == EINTR)
1553 continue;
1554#else
1555 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1556 continue;
1557#endif
1558 break;
1559 }
1560
1561 return i;
1562}
1563
1564
1565/*
1566 * Check for messages from worker processes.
1567 *
1568 * If a message is available, return it as a malloc'd string, and put the
1569 * index of the sending worker in *worker.
1570 *
1571 * If nothing is available, wait if "do_wait" is true, else return NULL.
1572 *
1573 * If we detect EOF on any socket, we'll return NULL. It's not great that
1574 * that's hard to distinguish from the no-data-available case, but for now
1575 * our one caller is okay with that.
1576 *
1577 * This function is executed in the leader process.
1578 */
1579static char *
1580getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1581{
1582 int i;
1584 int maxFd = -1;
1585 struct timeval nowait = {0, 0};
1586
1587 /* construct bitmap of socket descriptors for select() */
1589 for (i = 0; i < pstate->numWorkers; i++)
1590 {
1592 continue;
1594 if (pstate->parallelSlot[i].pipeRead > maxFd)
1595 maxFd = pstate->parallelSlot[i].pipeRead;
1596 }
1597
1598 if (do_wait)
1599 {
1601 Assert(i != 0);
1602 }
1603 else
1604 {
1605 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1606 return NULL;
1607 }
1608
1609 if (i < 0)
1610 pg_fatal("%s() failed: %m", "select");
1611
1612 for (i = 0; i < pstate->numWorkers; i++)
1613 {
1614 char *msg;
1615
1617 continue;
1618 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1619 continue;
1620
1621 /*
1622 * Read the message if any. If the socket is ready because of EOF,
1623 * we'll return NULL instead (and the socket will stay ready, so the
1624 * condition will persist).
1625 *
1626 * Note: because this is a blocking read, we'll wait if only part of
1627 * the message is available. Waiting a long time would be bad, but
1628 * since worker status messages are short and are always sent in one
1629 * operation, it shouldn't be a problem in practice.
1630 */
1632 *worker = i;
1633 return msg;
1634 }
1635 Assert(false);
1636 return NULL;
1637}
1638
1639/*
1640 * Send a command message to the specified worker process.
1641 *
1642 * This function is executed in the leader process.
1643 */
1644static void
1645sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1646{
1647 int len = strlen(str) + 1;
1648
1649 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1650 {
1651 pg_fatal("could not write to the communication channel: %m");
1652 }
1653}
1654
1655/*
1656 * Read one message from the specified pipe (fd), blocking if necessary
1657 * until one is available, and return it as a malloc'd string.
1658 * On EOF, return NULL.
1659 *
1660 * A "message" on the channel is just a null-terminated string.
1661 */
1662static char *
1664{
1665 char *msg;
1666 int msgsize,
1667 bufsize;
1668 int ret;
1669
1670 /*
1671 * In theory, if we let piperead() read multiple bytes, it might give us
1672 * back fragments of multiple messages. (That can't actually occur, since
1673 * neither leader nor workers send more than one message without waiting
1674 * for a reply, but we don't wish to assume that here.) For simplicity,
1675 * read a byte at a time until we get the terminating '\0'. This method
1676 * is a bit inefficient, but since this is only used for relatively short
1677 * command and status strings, it shouldn't matter.
1678 */
1679 bufsize = 64; /* could be any number */
1680 msg = (char *) pg_malloc(bufsize);
1681 msgsize = 0;
1682 for (;;)
1683 {
1685 ret = piperead(fd, msg + msgsize, 1);
1686 if (ret <= 0)
1687 break; /* error or connection closure */
1688
1689 Assert(ret == 1);
1690
1691 if (msg[msgsize] == '\0')
1692 return msg; /* collected whole message */
1693
1694 msgsize++;
1695 if (msgsize == bufsize) /* enlarge buffer if needed */
1696 {
1697 bufsize += 16; /* could be any number */
1698 msg = (char *) pg_realloc(msg, bufsize);
1699 }
1700 }
1701
1702 /* Other end has closed the connection */
1703 pg_free(msg);
1704 return NULL;
1705}
1706
1707#ifdef WIN32
1708
1709/*
1710 * This is a replacement version of pipe(2) for Windows which allows the pipe
1711 * handles to be used in select().
1712 *
1713 * Reads and writes on the pipe must go through piperead()/pipewrite().
1714 *
1715 * For consistency with Unix we declare the returned handles as "int".
1716 * This is okay even on WIN64 because system handles are not more than
1717 * 32 bits wide, but we do have to do some casting.
1718 */
1719static int
1720pgpipe(int handles[2])
1721{
1722 pgsocket s,
1723 tmp_sock;
1724 struct sockaddr_in serv_addr;
1725 int len = sizeof(serv_addr);
1726
1727 /* We have to use the Unix socket invalid file descriptor value here. */
1728 handles[0] = handles[1] = -1;
1729
1730 /*
1731 * setup listen socket
1732 */
1733 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1734 {
1735 pg_log_error("pgpipe: could not create socket: error code %d",
1736 WSAGetLastError());
1737 return -1;
1738 }
1739
1740 memset(&serv_addr, 0, sizeof(serv_addr));
1741 serv_addr.sin_family = AF_INET;
1742 serv_addr.sin_port = pg_hton16(0);
1743 serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1744 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1745 {
1746 pg_log_error("pgpipe: could not bind: error code %d",
1747 WSAGetLastError());
1748 closesocket(s);
1749 return -1;
1750 }
1751 if (listen(s, 1) == SOCKET_ERROR)
1752 {
1753 pg_log_error("pgpipe: could not listen: error code %d",
1754 WSAGetLastError());
1755 closesocket(s);
1756 return -1;
1757 }
1758 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1759 {
1760 pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1761 WSAGetLastError());
1762 closesocket(s);
1763 return -1;
1764 }
1765
1766 /*
1767 * setup pipe handles
1768 */
1770 {
1771 pg_log_error("pgpipe: could not create second socket: error code %d",
1772 WSAGetLastError());
1773 closesocket(s);
1774 return -1;
1775 }
1776 handles[1] = (int) tmp_sock;
1777
1779 {
1780 pg_log_error("pgpipe: could not connect socket: error code %d",
1781 WSAGetLastError());
1782 closesocket(handles[1]);
1783 handles[1] = -1;
1784 closesocket(s);
1785 return -1;
1786 }
1787 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1788 {
1789 pg_log_error("pgpipe: could not accept connection: error code %d",
1790 WSAGetLastError());
1791 closesocket(handles[1]);
1792 handles[1] = -1;
1793 closesocket(s);
1794 return -1;
1795 }
1796 handles[0] = (int) tmp_sock;
1797
1798 closesocket(s);
1799 return 0;
1800}
1801
1802#endif /* WIN32 */
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
Definition parallel.c:1061
static void sendMessageToLeader(int pipefd[2], const char *str)
Definition parallel.c:1529
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
Definition parallel.c:266
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
Definition parallel.c:1338
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
Definition parallel.c:1453
@ WRKR_WORKING
Definition parallel.c:81
@ WRKR_IDLE
Definition parallel.c:80
@ WRKR_TERMINATED
Definition parallel.c:82
static bool HasEveryWorkerTerminated(ParallelState *pstate)
Definition parallel.c:1254
#define pgpipe(a)
Definition parallel.c:139
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
Definition parallel.c:1400
static void sigTermHandler(SIGNAL_ARGS)
Definition parallel.c:547
#define PIPE_READ
Definition parallel.c:71
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
Definition parallel.c:899
static char * readMessageFromPipe(int fd)
Definition parallel.c:1664
static int select_loop(int maxFd, fd_set *workerset)
Definition parallel.c:1542
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
Definition parallel.c:1173
static int GetIdleWorker(ParallelState *pstate)
Definition parallel.c:1238
static void set_cancel_pstate(ParallelState *pstate)
Definition parallel.c:791
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
Definition parallel.c:831
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
Definition parallel.c:811
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
Definition parallel.c:1110
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
Definition parallel.c:1581
static void archive_close_connection(int code, void *arg)
Definition parallel.c:341
#define NO_SLOT
Definition parallel.c:74
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
Definition parallel.c:1646
#define PIPE_WRITE
Definition parallel.c:72
static ShutdownInformation shutdown_info
Definition parallel.c:154
void on_exit_close_archive(Archive *AHX)
Definition parallel.c:330
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
Definition parallel.c:1207
#define WORKER_IS_RUNNING(workerStatus)
Definition parallel.c:85
static char * getMessageFromLeader(int pipefd[2])
Definition parallel.c:1518
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
Definition parallel.c:1303
#define piperead(a, b, c)
Definition parallel.c:140
#define pipewrite(a, b, c)
Definition parallel.c:141
static void set_cancel_handler(void)
Definition parallel.c:610
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
Definition parallel.c:1158
static volatile DumpSignalInformation signal_info
Definition parallel.c:175
bool IsEveryWorkerIdle(ParallelState *pstate)
Definition parallel.c:1270
#define write_stderr(str)
Definition parallel.c:186
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
Definition parallel.c:1125
#define messageStartsWith(msg, prefix)
Definition parallel.c:228
static void ShutdownWorkersHard(ParallelState *pstate)
Definition parallel.c:397
static void WaitForTerminatingWorkers(ParallelState *pstate)
Definition parallel.c:448
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
Definition parallel.c:732
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
Definition parallel.h:24
WFW_WaitOption
Definition parallel.h:31
@ WFW_ALL_IDLE
Definition parallel.h:35
@ WFW_GOT_STATUS
Definition parallel.h:33
@ WFW_NO_WAIT
Definition parallel.h:32
@ WFW_ONE_IDLE
Definition parallel.h:34
#define SIGNAL_ARGS
Definition c.h:1363
#define Assert(condition)
Definition c.h:873
void err(int eval, const char *fmt,...)
Definition err.c:43
PGcancel * PQgetCancel(PGconn *conn)
Definition fe-cancel.c:368
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
Definition fe-cancel.c:548
void PQfreeCancel(PGcancel *cancel)
Definition fe-cancel.c:502
PGresult * PQexec(PGconn *conn, const char *query)
Definition fe-exec.c:2279
void * pg_malloc(size_t size)
Definition fe_memutils.c:47
void * pg_malloc0(size_t size)
Definition fe_memutils.c:53
void pg_free(void *ptr)
void * pg_realloc(void *ptr, size_t size)
Definition fe_memutils.c:65
const char * str
#define bufsize
int j
Definition isn.c:78
int i
Definition isn.c:77
#define PQclear
#define PQresultStatus
@ PGRES_COMMAND_OK
Definition libpq-fe.h:125
#define pg_log_error(...)
Definition logging.h:106
const char * progname
Definition main.c:44
int DumpId
Definition pg_backup.h:285
void DisconnectDatabase(Archive *AHX)
void DeCloneArchive(ArchiveHandle *AH)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
#define WORKER_IGNORED_ERRORS
@ ACT_RESTORE
void * arg
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
#define pg_fatal(...)
#define pg_hton32(x)
Definition pg_bswap.h:121
#define pg_hton16(x)
Definition pg_bswap.h:120
static PgChecksumMode mode
const void size_t len
static bool do_wait
Definition pg_ctl.c:76
static char buf[DEFAULT_XLOG_SEG_SIZE]
#define pqsignal
Definition port.h:547
int pgsocket
Definition port.h:29
#define snprintf
Definition port.h:260
#define PGINVALID_SOCKET
Definition port.h:31
#define closesocket
Definition port.h:397
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
PQExpBufferData * PQExpBuffer
Definition pqexpbuffer.h:51
static int fd(const char *x, int i)
static int fb(int x)
#define free(a)
PGconn * conn
Definition streamutil.c:52
const char * fmtQualifiedId(const char *schema, const char *id)
PQExpBuffer(* getLocalPQExpBuffer)(void)
int n_errors
Definition pg_backup.h:253
int numWorkers
Definition pg_backup.h:240
ArchiveHandle * myAH
Definition parallel.c:167
ParallelState * pstate
Definition parallel.c:168
ParallelCompletionPtr callback
Definition parallel.c:100
ArchiveHandle * AH
Definition parallel.c:103
void * callback_data
Definition parallel.c:101
T_WorkerStatus workerStatus
Definition parallel.c:97
int pipeRevRead
Definition parallel.c:107
int pipeRevWrite
Definition parallel.c:108
TocEntry ** te
Definition parallel.h:59
ParallelSlot * parallelSlot
Definition parallel.h:60
ParallelState * pstate
Definition parallel.c:150
WorkerJobDumpPtrType WorkerJobDumpPtr
PGcancel *volatile connCancel
WorkerJobRestorePtrType WorkerJobRestorePtr
SetupWorkerPtrType SetupWorkerPtr
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
#define bind(s, addr, addrlen)
Definition win32_port.h:496
#define EINTR
Definition win32_port.h:361
#define SIGPIPE
Definition win32_port.h:163
#define SIGQUIT
Definition win32_port.h:159
#define kill(pid, sig)
Definition win32_port.h:490
#define socket(af, type, protocol)
Definition win32_port.h:495
#define accept(s, addr, addrlen)
Definition win32_port.h:498
#define connect(s, name, namelen)
Definition win32_port.h:499
#define listen(s, backlog)
Definition win32_port.h:497
#define select(n, r, w, e, timeout)
Definition win32_port.h:500

◆ NO_SLOT

#define NO_SLOT   (-1) /* Failure result for GetIdleWorker() */

Definition at line 74 of file parallel.c.

◆ pgpipe

#define pgpipe (   a)    pipe(a)

Definition at line 139 of file parallel.c.

◆ PIPE_READ

#define PIPE_READ   0

Definition at line 71 of file parallel.c.

◆ PIPE_WRITE

#define PIPE_WRITE   1

Definition at line 72 of file parallel.c.

◆ piperead

#define piperead (   a,
  b,
  c 
)    read(a,b,c)

Definition at line 140 of file parallel.c.

◆ pipewrite

#define pipewrite (   a,
  b,
  c 
)    write(a,b,c)

Definition at line 141 of file parallel.c.

◆ WORKER_IS_RUNNING

#define WORKER_IS_RUNNING (   workerStatus)     ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)

Definition at line 85 of file parallel.c.

◆ write_stderr

#define write_stderr (   str)
Value:
do { \
const char *str_ = (str); \
int rc_; \
rc_ = write(fileno(stderr), str_, strlen(str_)); \
(void) rc_; \
} while (0)
#define write(a, b, c)
Definition win32.h:14

Definition at line 186 of file parallel.c.

187 { \
188 const char *str_ = (str); \
189 int rc_; \
190 rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 (void) rc_; \
192 } while (0)

Typedef Documentation

◆ DumpSignalInformation

◆ ShutdownInformation

Enumeration Type Documentation

◆ T_WorkerStatus

Enumerator
WRKR_NOT_STARTED 
WRKR_IDLE 
WRKR_WORKING 
WRKR_TERMINATED 

Definition at line 77 of file parallel.c.

78{
T_WorkerStatus
Definition parallel.c:78
@ WRKR_NOT_STARTED
Definition parallel.c:79

Function Documentation

◆ archive_close_connection()

static void archive_close_connection ( int  code,
void arg 
)
static

Definition at line 341 of file parallel.c.

342{
344
345 if (si->pstate)
346 {
347 /* In parallel mode, must figure out who we are */
348 ParallelSlot *slot = GetMyPSlot(si->pstate);
349
350 if (!slot)
351 {
352 /*
353 * We're the leader. Forcibly shut down workers, then close our
354 * own database connection, if any.
355 */
356 ShutdownWorkersHard(si->pstate);
357
358 if (si->AHX)
360 }
361 else
362 {
363 /*
364 * We're a worker. Shut down our own DB connection if any. On
365 * Windows, we also have to close our communication sockets, to
366 * emulate what will happen on Unix when the worker process exits.
367 * (Without this, if this is a premature exit, the leader would
368 * fail to detect it because there would be no EOF condition on
369 * the other end of the pipe.)
370 */
371 if (slot->AH)
372 DisconnectDatabase(&(slot->AH->public));
373
374#ifdef WIN32
377#endif
378 }
379 }
380 else
381 {
382 /* Non-parallel operation: just kill the leader DB connection */
383 if (si->AHX)
385 }
386}

References ParallelSlot::AH, arg, closesocket, DisconnectDatabase(), fb(), GetMyPSlot(), ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, _archiveHandle::public, and ShutdownWorkersHard().

Referenced by on_exit_close_archive().

◆ buildWorkerCommand()

static void buildWorkerCommand ( ArchiveHandle AH,
TocEntry te,
T_Action  act,
char buf,
int  buflen 
)
static

Definition at line 1110 of file parallel.c.

1112{
1113 if (act == ACT_DUMP)
1114 snprintf(buf, buflen, "DUMP %d", te->dumpId);
1115 else if (act == ACT_RESTORE)
1116 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1117 else
1118 Assert(false);
1119}

References ACT_DUMP, ACT_RESTORE, Assert, buf, _tocEntry::dumpId, fb(), and snprintf.

Referenced by DispatchJobForTocEntry().

◆ buildWorkerResponse()

static void buildWorkerResponse ( ArchiveHandle AH,
TocEntry te,
T_Action  act,
int  status,
char buf,
int  buflen 
)
static

Definition at line 1158 of file parallel.c.

1160{
1161 snprintf(buf, buflen, "OK %d %d %d",
1162 te->dumpId,
1163 status,
1164 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1165}

References buf, _tocEntry::dumpId, Archive::n_errors, _archiveHandle::public, snprintf, and WORKER_IGNORED_ERRORS.

Referenced by WaitForCommands().

◆ DispatchJobForTocEntry()

void DispatchJobForTocEntry ( ArchiveHandle AH,
ParallelState pstate,
TocEntry te,
T_Action  act,
ParallelCompletionPtr  callback,
void callback_data 
)

Definition at line 1207 of file parallel.c.

1213{
1214 int worker;
1215 char buf[256];
1216
1217 /* Get a worker, waiting if none are idle */
1218 while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1219 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1220
1221 /* Construct and send command string */
1222 buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1223
1224 sendMessageToWorker(pstate, worker, buf);
1225
1226 /* Remember worker is busy, and which TocEntry it's working on */
1227 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1228 pstate->parallelSlot[worker].callback = callback;
1229 pstate->parallelSlot[worker].callback_data = callback_data;
1230 pstate->te[worker] = te;
1231}

References buf, buildWorkerCommand(), ParallelSlot::callback, callback(), ParallelSlot::callback_data, fb(), GetIdleWorker(), NO_SLOT, ParallelState::parallelSlot, sendMessageToWorker(), ParallelState::te, WaitForWorkers(), WFW_ONE_IDLE, ParallelSlot::workerStatus, and WRKR_WORKING.

Referenced by restore_toc_entries_parallel(), and WriteDataChunks().

◆ GetIdleWorker()

static int GetIdleWorker ( ParallelState pstate)
static

Definition at line 1238 of file parallel.c.

1239{
1240 int i;
1241
1242 for (i = 0; i < pstate->numWorkers; i++)
1243 {
1244 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1245 return i;
1246 }
1247 return NO_SLOT;
1248}

References i, NO_SLOT, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by DispatchJobForTocEntry(), and WaitForWorkers().

◆ getMessageFromLeader()

static char * getMessageFromLeader ( int  pipefd[2])
static

Definition at line 1518 of file parallel.c.

1519{
1521}

References fb(), PIPE_READ, and readMessageFromPipe().

Referenced by WaitForCommands().

◆ getMessageFromWorker()

static char * getMessageFromWorker ( ParallelState pstate,
bool  do_wait,
int worker 
)
static

Definition at line 1581 of file parallel.c.

1582{
1583 int i;
1585 int maxFd = -1;
1586 struct timeval nowait = {0, 0};
1587
1588 /* construct bitmap of socket descriptors for select() */
1590 for (i = 0; i < pstate->numWorkers; i++)
1591 {
1593 continue;
1595 if (pstate->parallelSlot[i].pipeRead > maxFd)
1596 maxFd = pstate->parallelSlot[i].pipeRead;
1597 }
1598
1599 if (do_wait)
1600 {
1602 Assert(i != 0);
1603 }
1604 else
1605 {
1606 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1607 return NULL;
1608 }
1609
1610 if (i < 0)
1611 pg_fatal("%s() failed: %m", "select");
1612
1613 for (i = 0; i < pstate->numWorkers; i++)
1614 {
1615 char *msg;
1616
1618 continue;
1619 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1620 continue;
1621
1622 /*
1623 * Read the message if any. If the socket is ready because of EOF,
1624 * we'll return NULL instead (and the socket will stay ready, so the
1625 * condition will persist).
1626 *
1627 * Note: because this is a blocking read, we'll wait if only part of
1628 * the message is available. Waiting a long time would be bad, but
1629 * since worker status messages are short and are always sent in one
1630 * operation, it shouldn't be a problem in practice.
1631 */
1633 *worker = i;
1634 return msg;
1635 }
1636 Assert(false);
1637 return NULL;
1638}

References Assert, do_wait, fb(), i, ParallelState::numWorkers, ParallelState::parallelSlot, pg_fatal, ParallelSlot::pipeRead, readMessageFromPipe(), select, select_loop(), WORKER_IS_RUNNING, and ParallelSlot::workerStatus.

Referenced by ListenToWorkers().

◆ GetMyPSlot()

static ParallelSlot * GetMyPSlot ( ParallelState pstate)
static

Definition at line 266 of file parallel.c.

267{
268 int i;
269
270 for (i = 0; i < pstate->numWorkers; i++)
271 {
272#ifdef WIN32
273 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
274#else
275 if (pstate->parallelSlot[i].pid == getpid())
276#endif
277 return &(pstate->parallelSlot[i]);
278 }
279
280 return NULL;
281}

References fb(), i, ParallelState::numWorkers, ParallelState::parallelSlot, and ParallelSlot::pid.

Referenced by archive_close_connection().

◆ HasEveryWorkerTerminated()

static bool HasEveryWorkerTerminated ( ParallelState pstate)
static

Definition at line 1254 of file parallel.c.

1255{
1256 int i;
1257
1258 for (i = 0; i < pstate->numWorkers; i++)
1259 {
1261 return false;
1262 }
1263 return true;
1264}

References i, ParallelState::numWorkers, ParallelState::parallelSlot, WORKER_IS_RUNNING, and ParallelSlot::workerStatus.

Referenced by WaitForTerminatingWorkers().

◆ init_parallel_dump_utils()

void init_parallel_dump_utils ( void  )

Definition at line 238 of file parallel.c.

239{
240#ifdef WIN32
242 {
244 int err;
245
246 /* Prepare for threaded operation */
249
250 /* Initialize socket access */
251 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 if (err != 0)
253 pg_fatal("%s() failed: error code %d", "WSAStartup", err);
254
255 parallel_init_done = true;
256 }
257#endif
258}

References err(), fb(), and pg_fatal.

Referenced by main().

◆ IsEveryWorkerIdle()

bool IsEveryWorkerIdle ( ParallelState pstate)

Definition at line 1270 of file parallel.c.

1271{
1272 int i;
1273
1274 for (i = 0; i < pstate->numWorkers; i++)
1275 {
1276 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1277 return false;
1278 }
1279 return true;
1280}

References i, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by ParallelBackupEnd(), restore_toc_entries_parallel(), and WaitForWorkers().

◆ ListenToWorkers()

static bool ListenToWorkers ( ArchiveHandle AH,
ParallelState pstate,
bool  do_wait 
)
static

Definition at line 1400 of file parallel.c.

1401{
1402 int worker;
1403 char *msg;
1404
1405 /* Try to collect a status message */
1406 msg = getMessageFromWorker(pstate, do_wait, &worker);
1407
1408 if (!msg)
1409 {
1410 /* If do_wait is true, we must have detected EOF on some socket */
1411 if (do_wait)
1412 pg_fatal("a worker process died unexpectedly");
1413 return false;
1414 }
1415
1416 /* Process it and update our idea of the worker's status */
1417 if (messageStartsWith(msg, "OK "))
1418 {
1419 ParallelSlot *slot = &pstate->parallelSlot[worker];
1420 TocEntry *te = pstate->te[worker];
1421 int status;
1422
1423 status = parseWorkerResponse(AH, te, msg);
1424 slot->callback(AH, te, status, slot->callback_data);
1425 slot->workerStatus = WRKR_IDLE;
1426 pstate->te[worker] = NULL;
1427 }
1428 else
1429 pg_fatal("invalid message received from worker: \"%s\"",
1430 msg);
1431
1432 /* Free the string returned from getMessageFromWorker */
1433 free(msg);
1434
1435 return true;
1436}

References ParallelSlot::callback, ParallelSlot::callback_data, do_wait, fb(), free, getMessageFromWorker(), messageStartsWith, ParallelState::parallelSlot, parseWorkerResponse(), pg_fatal, ParallelState::te, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by WaitForWorkers().

◆ lockTableForWorker()

static void lockTableForWorker ( ArchiveHandle AH,
TocEntry te 
)
static

Definition at line 1303 of file parallel.c.

1304{
1305 const char *qualId;
1306 PQExpBuffer query;
1307 PGresult *res;
1308
1309 /* Nothing to do for BLOBS */
1310 if (strcmp(te->desc, "BLOBS") == 0)
1311 return;
1312
1313 query = createPQExpBuffer();
1314
1315 qualId = fmtQualifiedId(te->namespace, te->tag);
1316
1317 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1318 qualId);
1319
1320 res = PQexec(AH->connection, query->data);
1321
1322 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1323 pg_fatal("could not obtain lock on relation \"%s\"\n"
1324 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1325 "on the table after the pg_dump parent process had gotten the "
1326 "initial ACCESS SHARE lock on the table.", qualId);
1327
1328 PQclear(res);
1329 destroyPQExpBuffer(query);
1330}

References appendPQExpBuffer(), _archiveHandle::connection, createPQExpBuffer(), PQExpBufferData::data, _tocEntry::desc, destroyPQExpBuffer(), fb(), fmtQualifiedId(), pg_fatal, PGRES_COMMAND_OK, PQclear, PQexec(), PQresultStatus, and _tocEntry::tag.

Referenced by WaitForCommands().

◆ on_exit_close_archive()

void on_exit_close_archive ( Archive AHX)

◆ ParallelBackupEnd()

void ParallelBackupEnd ( ArchiveHandle AH,
ParallelState pstate 
)

Definition at line 1061 of file parallel.c.

1062{
1063 int i;
1064
1065 /* No work if non-parallel */
1066 if (pstate->numWorkers == 1)
1067 return;
1068
1069 /* There should not be any unfinished jobs */
1070 Assert(IsEveryWorkerIdle(pstate));
1071
1072 /* Close the sockets so that the workers know they can exit */
1073 for (i = 0; i < pstate->numWorkers; i++)
1074 {
1077 }
1078
1079 /* Wait for them to exit */
1081
1082 /*
1083 * Unlink pstate from shutdown_info, so the exit handler will not try to
1084 * use it; and likewise unlink from signal_info.
1085 */
1088
1089 /* Release state (mere neatnik-ism, since we're about to terminate) */
1090 free(pstate->te);
1091 free(pstate->parallelSlot);
1092 free(pstate);
1093}

References Assert, closesocket, fb(), free, i, IsEveryWorkerIdle(), ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pipeRead, ParallelSlot::pipeWrite, ShutdownInformation::pstate, set_cancel_pstate(), shutdown_info, ParallelState::te, and WaitForTerminatingWorkers().

Referenced by _CloseArchive(), and RestoreArchive().

◆ ParallelBackupStart()

ParallelState * ParallelBackupStart ( ArchiveHandle AH)

Definition at line 899 of file parallel.c.

900{
901 ParallelState *pstate;
902 int i;
903
904 Assert(AH->public.numWorkers > 0);
905
906 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
907
908 pstate->numWorkers = AH->public.numWorkers;
909 pstate->te = NULL;
910 pstate->parallelSlot = NULL;
911
912 if (AH->public.numWorkers == 1)
913 return pstate;
914
915 /* Create status arrays, being sure to initialize all fields to 0 */
916 pstate->te = (TocEntry **)
917 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
918 pstate->parallelSlot = (ParallelSlot *)
919 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
920
921#ifdef WIN32
922 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
924#endif
925
926 /*
927 * Set the pstate in shutdown_info, to tell the exit handler that it must
928 * clean up workers as well as the main database connection. But we don't
929 * set this in signal_info yet, because we don't want child processes to
930 * inherit non-NULL signal_info.pstate.
931 */
932 shutdown_info.pstate = pstate;
933
934 /*
935 * Temporarily disable query cancellation on the leader connection. This
936 * ensures that child processes won't inherit valid AH->connCancel
937 * settings and thus won't try to issue cancels against the leader's
938 * connection. No harm is done if we fail while it's disabled, because
939 * the leader connection is idle at this point anyway.
940 */
942
943 /* Ensure stdio state is quiesced before forking */
944 fflush(NULL);
945
946 /* Create desired number of workers */
947 for (i = 0; i < pstate->numWorkers; i++)
948 {
949#ifdef WIN32
950 WorkerInfo *wi;
951 uintptr_t handle;
952#else
953 pid_t pid;
954#endif
955 ParallelSlot *slot = &(pstate->parallelSlot[i]);
956 int pipeMW[2],
957 pipeWM[2];
958
959 /* Create communication pipes for this worker */
960 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
961 pg_fatal("could not create communication channels: %m");
962
963 /* leader's ends of the pipes */
964 slot->pipeRead = pipeWM[PIPE_READ];
965 slot->pipeWrite = pipeMW[PIPE_WRITE];
966 /* child's ends of the pipes */
969
970#ifdef WIN32
971 /* Create transient structure to pass args to worker function */
972 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
973
974 wi->AH = AH;
975 wi->slot = slot;
976
977 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
978 wi, 0, &(slot->threadId));
979 slot->hThread = handle;
980 slot->workerStatus = WRKR_IDLE;
981#else /* !WIN32 */
982 pid = fork();
983 if (pid == 0)
984 {
985 /* we are the worker */
986 int j;
987
988 /* this is needed for GetMyPSlot() */
989 slot->pid = getpid();
990
991 /* instruct signal handler that we're in a worker now */
992 signal_info.am_worker = true;
993
994 /* close read end of Worker -> Leader */
996 /* close write end of Leader -> Worker */
998
999 /*
1000 * Close all inherited fds for communication of the leader with
1001 * previously-forked workers.
1002 */
1003 for (j = 0; j < i; j++)
1004 {
1007 }
1008
1009 /* Run the worker ... */
1010 RunWorker(AH, slot);
1011
1012 /* We can just exit(0) when done */
1013 exit(0);
1014 }
1015 else if (pid < 0)
1016 {
1017 /* fork failed */
1018 pg_fatal("could not create worker process: %m");
1019 }
1020
1021 /* In Leader after successful fork */
1022 slot->pid = pid;
1023 slot->workerStatus = WRKR_IDLE;
1024
1025 /* close read end of Leader -> Worker */
1027 /* close write end of Worker -> Leader */
1029#endif /* WIN32 */
1030 }
1031
1032 /*
1033 * Having forked off the workers, disable SIGPIPE so that leader isn't
1034 * killed if it tries to send a command to a dead worker. We don't want
1035 * the workers to inherit this setting, though.
1036 */
1037#ifndef WIN32
1039#endif
1040
1041 /*
1042 * Re-establish query cancellation on the leader connection.
1043 */
1045
1046 /*
1047 * Tell the cancel signal handler to forward signals to worker processes,
1048 * too. (As with query cancel, we did not need this earlier because the
1049 * workers have not yet been given anything to do; if we die before this
1050 * point, any already-started workers will see EOF and quit promptly.)
1051 */
1052 set_cancel_pstate(pstate);
1053
1054 return pstate;
1055}

References DumpSignalInformation::am_worker, Assert, closesocket, _archiveHandle::connection, fb(), getLocalPQExpBuffer, i, j, ParallelState::numWorkers, Archive::numWorkers, ParallelState::parallelSlot, pg_fatal, pg_malloc(), pg_malloc0(), pgpipe, ParallelSlot::pid, PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRead, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, ParallelSlot::pipeWrite, pqsignal, ShutdownInformation::pstate, _archiveHandle::public, RunWorker(), set_archive_cancel_info(), set_cancel_pstate(), shutdown_info, signal_info, SIGPIPE, ParallelState::te, ParallelSlot::workerStatus, and WRKR_IDLE.

Referenced by _CloseArchive(), and RestoreArchive().

◆ parseWorkerCommand()

static void parseWorkerCommand ( ArchiveHandle AH,
TocEntry **  te,
T_Action act,
const char msg 
)
static

Definition at line 1125 of file parallel.c.

1127{
1128 DumpId dumpId;
1129 int nBytes;
1130
1131 if (messageStartsWith(msg, "DUMP "))
1132 {
1133 *act = ACT_DUMP;
1134 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1135 Assert(nBytes == strlen(msg));
1136 *te = getTocEntryByDumpId(AH, dumpId);
1137 Assert(*te != NULL);
1138 }
1139 else if (messageStartsWith(msg, "RESTORE "))
1140 {
1141 *act = ACT_RESTORE;
1142 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1143 Assert(nBytes == strlen(msg));
1144 *te = getTocEntryByDumpId(AH, dumpId);
1145 Assert(*te != NULL);
1146 }
1147 else
1148 pg_fatal("unrecognized command received from leader: \"%s\"",
1149 msg);
1150}

References ACT_DUMP, ACT_RESTORE, Assert, fb(), getTocEntryByDumpId(), messageStartsWith, and pg_fatal.

Referenced by WaitForCommands().

◆ parseWorkerResponse()

static int parseWorkerResponse ( ArchiveHandle AH,
TocEntry te,
const char msg 
)
static

Definition at line 1173 of file parallel.c.

1175{
1176 DumpId dumpId;
1177 int nBytes,
1178 n_errors;
1179 int status = 0;
1180
1181 if (messageStartsWith(msg, "OK "))
1182 {
1183 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1184
1185 Assert(dumpId == te->dumpId);
1186 Assert(nBytes == strlen(msg));
1187
1188 AH->public.n_errors += n_errors;
1189 }
1190 else
1191 pg_fatal("invalid message received from worker: \"%s\"",
1192 msg);
1193
1194 return status;
1195}

References Assert, _tocEntry::dumpId, fb(), messageStartsWith, Archive::n_errors, pg_fatal, and _archiveHandle::public.

Referenced by ListenToWorkers().

◆ readMessageFromPipe()

static char * readMessageFromPipe ( int  fd)
static

Definition at line 1664 of file parallel.c.

1665{
1666 char *msg;
1667 int msgsize,
1668 bufsize;
1669 int ret;
1670
1671 /*
1672 * In theory, if we let piperead() read multiple bytes, it might give us
1673 * back fragments of multiple messages. (That can't actually occur, since
1674 * neither leader nor workers send more than one message without waiting
1675 * for a reply, but we don't wish to assume that here.) For simplicity,
1676 * read a byte at a time until we get the terminating '\0'. This method
1677 * is a bit inefficient, but since this is only used for relatively short
1678 * command and status strings, it shouldn't matter.
1679 */
1680 bufsize = 64; /* could be any number */
1681 msg = (char *) pg_malloc(bufsize);
1682 msgsize = 0;
1683 for (;;)
1684 {
1686 ret = piperead(fd, msg + msgsize, 1);
1687 if (ret <= 0)
1688 break; /* error or connection closure */
1689
1690 Assert(ret == 1);
1691
1692 if (msg[msgsize] == '\0')
1693 return msg; /* collected whole message */
1694
1695 msgsize++;
1696 if (msgsize == bufsize) /* enlarge buffer if needed */
1697 {
1698 bufsize += 16; /* could be any number */
1699 msg = (char *) pg_realloc(msg, bufsize);
1700 }
1701 }
1702
1703 /* Other end has closed the connection */
1704 pg_free(msg);
1705 return NULL;
1706}

References Assert, bufsize, fb(), fd(), pg_free(), pg_malloc(), pg_realloc(), and piperead.

Referenced by getMessageFromLeader(), and getMessageFromWorker().

◆ RunWorker()

static void RunWorker ( ArchiveHandle AH,
ParallelSlot slot 
)
static

Definition at line 831 of file parallel.c.

832{
833 int pipefd[2];
834
835 /* fetch child ends of pipes */
838
839 /*
840 * Clone the archive so that we have our own state to work with, and in
841 * particular our own database connection.
842 *
843 * We clone on Unix as well as Windows, even though technically we don't
844 * need to because fork() gives us a copy in our own address space
845 * already. But CloneArchive resets the state information and also clones
846 * the database connection which both seem kinda helpful.
847 */
848 AH = CloneArchive(AH);
849
850 /* Remember cloned archive where signal handler can find it */
851 set_cancel_slot_archive(slot, AH);
852
853 /*
854 * Call the setup worker function that's defined in the ArchiveHandle.
855 */
856 (AH->SetupWorkerPtr) ((Archive *) AH);
857
858 /*
859 * Execute commands until done.
860 */
862
863 /*
864 * Disconnect from database and clean up.
865 */
868 DeCloneArchive(AH);
869}

References CloneArchive(), DeCloneArchive(), DisconnectDatabase(), fb(), PIPE_READ, PIPE_WRITE, ParallelSlot::pipeRevRead, ParallelSlot::pipeRevWrite, _archiveHandle::public, set_cancel_slot_archive(), _archiveHandle::SetupWorkerPtr, and WaitForCommands().

Referenced by ParallelBackupStart().

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set workerset 
)
static

Definition at line 1542 of file parallel.c.

1543{
1544 int i;
1546
1547 for (;;)
1548 {
1549 *workerset = saveSet;
1550 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1551
1552#ifndef WIN32
1553 if (i < 0 && errno == EINTR)
1554 continue;
1555#else
1556 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1557 continue;
1558#endif
1559 break;
1560 }
1561
1562 return i;
1563}

References EINTR, fb(), i, and select.

Referenced by getMessageFromWorker().

◆ sendMessageToLeader()

static void sendMessageToLeader ( int  pipefd[2],
const char str 
)
static

Definition at line 1529 of file parallel.c.

1530{
1531 int len = strlen(str) + 1;
1532
1533 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1534 pg_fatal("could not write to the communication channel: %m");
1535}

References fb(), len, pg_fatal, PIPE_WRITE, pipewrite, and str.

Referenced by WaitForCommands().

◆ sendMessageToWorker()

static void sendMessageToWorker ( ParallelState pstate,
int  worker,
const char str 
)
static

Definition at line 1646 of file parallel.c.

1647{
1648 int len = strlen(str) + 1;
1649
1650 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1651 {
1652 pg_fatal("could not write to the communication channel: %m");
1653 }
1654}

References fb(), len, ParallelState::parallelSlot, pg_fatal, ParallelSlot::pipeWrite, pipewrite, and str.

Referenced by DispatchJobForTocEntry().

◆ set_archive_cancel_info()

void set_archive_cancel_info ( ArchiveHandle AH,
PGconn conn 
)

Definition at line 732 of file parallel.c.

733{
735
736 /*
737 * Activate the interrupt handler if we didn't yet in this process. On
738 * Windows, this also initializes signal_info_lock; therefore it's
739 * important that this happen at least once before we fork off any
740 * threads.
741 */
743
744 /*
745 * On Unix, we assume that storing a pointer value is atomic with respect
746 * to any possible signal interrupt. On Windows, use a critical section.
747 */
748
749#ifdef WIN32
751#endif
752
753 /* Free the old one if we have one */
755 /* be sure interrupt handler doesn't use pointer while freeing */
756 AH->connCancel = NULL;
757
758 if (oldConnCancel != NULL)
760
761 /* Set the new one if specified */
762 if (conn)
764
765 /*
766 * On Unix, there's only ever one active ArchiveHandle per process, so we
767 * can just set signal_info.myAH unconditionally. On Windows, do that
768 * only in the main thread; worker threads have to make sure their
769 * ArchiveHandle appears in the pstate data, which is dealt with in
770 * RunWorker().
771 */
772#ifndef WIN32
773 signal_info.myAH = AH;
774#else
776 signal_info.myAH = AH;
777#endif
778
779#ifdef WIN32
781#endif
782}

References conn, _archiveHandle::connCancel, fb(), DumpSignalInformation::myAH, PQfreeCancel(), PQgetCancel(), set_cancel_handler(), and signal_info.

Referenced by ConnectDatabaseAhx(), DisconnectDatabase(), and ParallelBackupStart().

◆ set_cancel_handler()

static void set_cancel_handler ( void  )
static

Definition at line 610 of file parallel.c.

611{
612 /*
613 * When forking, signal_info.handler_set will propagate into the new
614 * process, but that's fine because the signal handler state does too.
615 */
617 {
619
623 }
624}

References fb(), DumpSignalInformation::handler_set, pqsignal, signal_info, SIGQUIT, and sigTermHandler().

Referenced by set_archive_cancel_info().

◆ set_cancel_pstate()

static void set_cancel_pstate ( ParallelState pstate)
static

Definition at line 791 of file parallel.c.

792{
793#ifdef WIN32
795#endif
796
797 signal_info.pstate = pstate;
798
799#ifdef WIN32
801#endif
802}

References fb(), DumpSignalInformation::pstate, and signal_info.

Referenced by ParallelBackupEnd(), and ParallelBackupStart().

◆ set_cancel_slot_archive()

static void set_cancel_slot_archive ( ParallelSlot slot,
ArchiveHandle AH 
)
static

Definition at line 811 of file parallel.c.

812{
813#ifdef WIN32
815#endif
816
817 slot->AH = AH;
818
819#ifdef WIN32
821#endif
822}

References ParallelSlot::AH, and fb().

Referenced by RunWorker().

◆ ShutdownWorkersHard()

static void ShutdownWorkersHard ( ParallelState pstate)
static

Definition at line 397 of file parallel.c.

398{
399 int i;
400
401 /*
402 * Close our write end of the sockets so that any workers waiting for
403 * commands know they can exit. (Note: some of the pipeWrite fields might
404 * still be zero, if we failed to initialize all the workers. Hence, just
405 * ignore errors here.)
406 */
407 for (i = 0; i < pstate->numWorkers; i++)
409
410 /*
411 * Force early termination of any commands currently in progress.
412 */
413#ifndef WIN32
414 /* On non-Windows, send SIGTERM to each worker process. */
415 for (i = 0; i < pstate->numWorkers; i++)
416 {
417 pid_t pid = pstate->parallelSlot[i].pid;
418
419 if (pid != 0)
420 kill(pid, SIGTERM);
421 }
422#else
423
424 /*
425 * On Windows, send query cancels directly to the workers' backends. Use
426 * a critical section to ensure worker threads don't change state.
427 */
429 for (i = 0; i < pstate->numWorkers; i++)
430 {
431 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
432 char errbuf[1];
433
434 if (AH != NULL && AH->connCancel != NULL)
435 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
436 }
438#endif
439
440 /* Now wait for them to terminate. */
442}

References ParallelSlot::AH, closesocket, _archiveHandle::connCancel, fb(), i, kill, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, ParallelSlot::pipeWrite, PQcancel(), and WaitForTerminatingWorkers().

Referenced by archive_close_connection().

◆ sigTermHandler()

static void sigTermHandler ( SIGNAL_ARGS  )
static

Definition at line 547 of file parallel.c.

548{
549 int i;
550 char errbuf[1];
551
552 /*
553 * Some platforms allow delivery of new signals to interrupt an active
554 * signal handler. That could muck up our attempt to send PQcancel, so
555 * disable the signals that set_cancel_handler enabled.
556 */
560
561 /*
562 * If we're in the leader, forward signal to all workers. (It seems best
563 * to do this before PQcancel; killing the leader transaction will result
564 * in invalid-snapshot errors from active workers, which maybe we can
565 * quiet by killing workers first.) Ignore any errors.
566 */
567 if (signal_info.pstate != NULL)
568 {
569 for (i = 0; i < signal_info.pstate->numWorkers; i++)
570 {
572
573 if (pid != 0)
574 kill(pid, SIGTERM);
575 }
576 }
577
578 /*
579 * Send QueryCancel if we have a connection to send to. Ignore errors,
580 * there's not much we can do about them anyway.
581 */
583 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
584
585 /*
586 * Report we're quitting, using nothing more complicated than write(2).
587 * When in parallel operation, only the leader process should do this.
588 */
590 {
591 if (progname)
592 {
594 write_stderr(": ");
595 }
596 write_stderr("terminated by user\n");
597 }
598
599 /*
600 * And die, using _exit() not exit() because the latter will invoke atexit
601 * handlers that can fail if we interrupted related code.
602 */
603 _exit(1);
604}

References DumpSignalInformation::am_worker, _archiveHandle::connCancel, fb(), i, kill, DumpSignalInformation::myAH, ParallelState::numWorkers, ParallelState::parallelSlot, ParallelSlot::pid, PQcancel(), pqsignal, progname, DumpSignalInformation::pstate, signal_info, SIGQUIT, and write_stderr.

Referenced by set_cancel_handler().

◆ WaitForCommands()

static void WaitForCommands ( ArchiveHandle AH,
int  pipefd[2] 
)
static

Definition at line 1338 of file parallel.c.

1339{
1340 char *command;
1341 TocEntry *te;
1342 T_Action act;
1343 int status = 0;
1344 char buf[256];
1345
1346 for (;;)
1347 {
1348 if (!(command = getMessageFromLeader(pipefd)))
1349 {
1350 /* EOF, so done */
1351 return;
1352 }
1353
1354 /* Decode the command */
1355 parseWorkerCommand(AH, &te, &act, command);
1356
1357 if (act == ACT_DUMP)
1358 {
1359 /* Acquire lock on this table within the worker's session */
1360 lockTableForWorker(AH, te);
1361
1362 /* Perform the dump command */
1363 status = (AH->WorkerJobDumpPtr) (AH, te);
1364 }
1365 else if (act == ACT_RESTORE)
1366 {
1367 /* Perform the restore command */
1368 status = (AH->WorkerJobRestorePtr) (AH, te);
1369 }
1370 else
1371 Assert(false);
1372
1373 /* Return status to leader */
1374 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1375
1377
1378 /* command was pg_malloc'd and we are responsible for free()ing it. */
1379 free(command);
1380 }
1381}

References ACT_DUMP, ACT_RESTORE, Assert, buf, buildWorkerResponse(), fb(), free, getMessageFromLeader(), lockTableForWorker(), parseWorkerCommand(), sendMessageToLeader(), _archiveHandle::WorkerJobDumpPtr, and _archiveHandle::WorkerJobRestorePtr.

Referenced by RunWorker().

◆ WaitForTerminatingWorkers()

static void WaitForTerminatingWorkers ( ParallelState pstate)
static

Definition at line 448 of file parallel.c.

449{
450 while (!HasEveryWorkerTerminated(pstate))
451 {
452 ParallelSlot *slot = NULL;
453 int j;
454
455#ifndef WIN32
456 /* On non-Windows, use wait() to wait for next worker to end */
457 int status;
458 pid_t pid = wait(&status);
459
460 /* Find dead worker's slot, and clear the PID field */
461 for (j = 0; j < pstate->numWorkers; j++)
462 {
463 slot = &(pstate->parallelSlot[j]);
464 if (slot->pid == pid)
465 {
466 slot->pid = 0;
467 break;
468 }
469 }
470#else /* WIN32 */
471 /* On Windows, we must use WaitForMultipleObjects() */
472 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
473 int nrun = 0;
474 DWORD ret;
476
477 for (j = 0; j < pstate->numWorkers; j++)
478 {
480 {
481 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
482 nrun++;
483 }
484 }
486 Assert(ret != WAIT_FAILED);
489
490 /* Find dead worker's slot, and clear the hThread field */
491 for (j = 0; j < pstate->numWorkers; j++)
492 {
493 slot = &(pstate->parallelSlot[j]);
494 if (slot->hThread == hThread)
495 {
496 /* For cleanliness, close handles for dead threads */
497 CloseHandle((HANDLE) slot->hThread);
498 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
499 break;
500 }
501 }
502#endif /* WIN32 */
503
504 /* On all platforms, update workerStatus and te[] as well */
505 Assert(j < pstate->numWorkers);
507 pstate->te[j] = NULL;
508 }
509}

References Assert, fb(), free, HasEveryWorkerTerminated(), j, ParallelState::numWorkers, ParallelState::parallelSlot, pg_malloc(), ParallelSlot::pid, ParallelState::te, WORKER_IS_RUNNING, ParallelSlot::workerStatus, and WRKR_TERMINATED.

Referenced by ParallelBackupEnd(), and ShutdownWorkersHard().

◆ WaitForWorkers()

void WaitForWorkers ( ArchiveHandle AH,
ParallelState pstate,
WFW_WaitOption  mode 
)

Definition at line 1453 of file parallel.c.

1454{
1455 bool do_wait = false;
1456
1457 /*
1458 * In GOT_STATUS mode, always block waiting for a message, since we can't
1459 * return till we get something. In other modes, we don't block the first
1460 * time through the loop.
1461 */
1462 if (mode == WFW_GOT_STATUS)
1463 {
1464 /* Assert that caller knows what it's doing */
1465 Assert(!IsEveryWorkerIdle(pstate));
1466 do_wait = true;
1467 }
1468
1469 for (;;)
1470 {
1471 /*
1472 * Check for status messages, even if we don't need to block. We do
1473 * not try very hard to reap all available messages, though, since
1474 * there's unlikely to be more than one.
1475 */
1476 if (ListenToWorkers(AH, pstate, do_wait))
1477 {
1478 /*
1479 * If we got a message, we are done by definition for GOT_STATUS
1480 * mode, and we can also be certain that there's at least one idle
1481 * worker. So we're done in all but ALL_IDLE mode.
1482 */
1483 if (mode != WFW_ALL_IDLE)
1484 return;
1485 }
1486
1487 /* Check whether we must wait for new status messages */
1488 switch (mode)
1489 {
1490 case WFW_NO_WAIT:
1491 return; /* never wait */
1492 case WFW_GOT_STATUS:
1493 Assert(false); /* can't get here, because we waited */
1494 break;
1495 case WFW_ONE_IDLE:
1496 if (GetIdleWorker(pstate) != NO_SLOT)
1497 return;
1498 break;
1499 case WFW_ALL_IDLE:
1500 if (IsEveryWorkerIdle(pstate))
1501 return;
1502 break;
1503 }
1504
1505 /* Loop back, and this time wait for something to happen */
1506 do_wait = true;
1507 }
1508}

References Assert, do_wait, GetIdleWorker(), IsEveryWorkerIdle(), ListenToWorkers(), mode, NO_SLOT, WFW_ALL_IDLE, WFW_GOT_STATUS, WFW_NO_WAIT, and WFW_ONE_IDLE.

Referenced by DispatchJobForTocEntry(), restore_toc_entries_parallel(), and WriteDataChunks().

Variable Documentation

◆ shutdown_info

ShutdownInformation shutdown_info
static

Definition at line 154 of file parallel.c.

Referenced by on_exit_close_archive(), ParallelBackupEnd(), and ParallelBackupStart().

◆ signal_info