61 #ifdef HAVE_SYS_SELECT_H 62 #include <sys/select.h> 85 #define WORKER_IS_RUNNING(workerStatus) \ 86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING) 113 unsigned int threadId;
132 static int pgpipe(
int handles[2]);
133 #define piperead(a,b,c) recv(a,b,c,0) 134 #define pipewrite(a,b,c) send(a,b,c,0) 139 #define pgpipe(a) pipe(a) 140 #define piperead(a,b,c) read(a,b,c) 141 #define pipewrite(a,b,c) write(a,b,c) 178 static CRITICAL_SECTION signal_info_lock;
186 #define write_stderr(str) \ 188 const char *str_ = (str); \ 190 rc_ = write(fileno(stderr), str_, strlen(str_)); \ 197 static DWORD tls_index;
200 bool parallel_init_done =
false;
221 static int select_loop(
int maxFd, fd_set *workerset);
225 int worker,
const char *
str);
228 #define messageStartsWith(msg, prefix) \ 229 (strncmp(msg, prefix, strlen(prefix)) == 0) 241 if (!parallel_init_done)
247 tls_index = TlsAlloc();
248 mainThreadId = GetCurrentThreadId();
251 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
258 parallel_init_done =
true;
276 if (pstate->
parallelSlot[i].threadId == GetCurrentThreadId())
294 getThreadLocalPQExpBuffer(
void)
304 if (parallel_init_done)
307 id_return = s_id_return;
318 if (parallel_init_done)
319 TlsSetValue(tls_index, id_return);
321 s_id_return = id_return;
335 shutdown_info.
AHX = AHX;
431 EnterCriticalSection(&signal_info_lock);
440 LeaveCriticalSection(&signal_info_lock);
461 pid_t
pid = wait(&status);
467 if (slot->
pid == pid)
484 lpHandles[nrun] = (HANDLE) pstate->
parallelSlot[j].hThread;
488 ret = WaitForMultipleObjects(nrun, lpHandles,
false, INFINITE);
489 Assert(ret != WAIT_FAILED);
490 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
497 if (slot->hThread == hThread)
500 CloseHandle((HANDLE) slot->hThread);
501 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
508 Assert(j < pstate->numWorkers);
510 pstate->
te[j] = NULL;
570 if (signal_info.
pstate != NULL)
639 consoleHandler(DWORD dwCtrlType)
644 if (dwCtrlType == CTRL_C_EVENT ||
645 dwCtrlType == CTRL_BREAK_EVENT)
648 EnterCriticalSection(&signal_info_lock);
660 if (signal_info.
pstate != NULL)
666 HANDLE hThread = (HANDLE) slot->hThread;
673 if (hThread != INVALID_HANDLE_VALUE)
674 TerminateThread(hThread, 0);
687 errbuf,
sizeof(errbuf));
689 LeaveCriticalSection(&signal_info_lock);
719 InitializeCriticalSection(&signal_info_lock);
721 SetConsoleCtrlHandler(consoleHandler, TRUE);
753 EnterCriticalSection(&signal_info_lock);
761 if (oldConnCancel != NULL)
778 if (mainThreadId == GetCurrentThreadId())
783 LeaveCriticalSection(&signal_info_lock);
797 EnterCriticalSection(&signal_info_lock);
800 signal_info.
pstate = pstate;
803 LeaveCriticalSection(&signal_info_lock);
817 EnterCriticalSection(&signal_info_lock);
823 LeaveCriticalSection(&signal_info_lock);
878 static unsigned __stdcall
935 shutdown_info.
pstate = pstate;
964 fatal(
"could not create communication channels: %m");
980 handle = _beginthreadex(NULL, 0, (
void *) &init_spawned_worker_win32,
981 wi, 0, &(slot->threadId));
982 slot->hThread = handle;
992 slot->
pid = getpid();
1006 for (j = 0; j <
i; j++)
1021 fatal(
"could not create worker process: %m");
1089 shutdown_info.
pstate = NULL;
1114 char *
buf,
int buflen)
1137 sscanf(msg,
"DUMP %d%n", &dumpId, &nBytes);
1138 Assert(nBytes == strlen(msg));
1145 sscanf(msg,
"RESTORE %d%n", &dumpId, &nBytes);
1146 Assert(nBytes == strlen(msg));
1151 fatal(
"unrecognized command received from leader: \"%s\"",
1162 char *
buf,
int buflen)
1164 snprintf(buf, buflen,
"OK %d %d %d",
1186 sscanf(msg,
"OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1189 Assert(nBytes == strlen(msg));
1194 fatal(
"invalid message received from worker: \"%s\"",
1233 pstate->
te[worker] = te;
1313 if (strcmp(te->
desc,
"BLOBS") == 0)
1326 fatal(
"could not obtain lock on relation \"%s\"\n" 1327 "This usually means that someone requested an ACCESS EXCLUSIVE lock " 1328 "on the table after the pg_dump parent process had gotten the " 1329 "initial ACCESS SHARE lock on the table.", qualId);
1415 fatal(
"a worker process died unexpectedly");
1429 pstate->
te[worker] = NULL;
1432 fatal(
"invalid message received from worker: \"%s\"",
1534 int len = strlen(str) + 1;
1537 fatal(
"could not write to the communication channel: %m");
1548 fd_set saveSet = *workerset;
1552 *workerset = saveSet;
1553 i =
select(maxFd + 1, workerset, NULL, NULL, NULL);
1556 if (i < 0 && errno ==
EINTR)
1559 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1589 struct timeval nowait = {0, 0};
1592 FD_ZERO(&workerset);
1609 if ((i =
select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1614 fatal(
"select() failed: %m");
1651 int len = strlen(str) + 1;
1655 fatal(
"could not write to the communication channel: %m");
1688 Assert(msgsize < bufsize);
1689 ret =
piperead(fd, msg + msgsize, 1);
1695 if (msg[msgsize] ==
'\0')
1699 if (msgsize == bufsize)
1728 struct sockaddr_in serv_addr;
1729 int len =
sizeof(serv_addr);
1732 handles[0] = handles[1] = -1;
1739 pg_log_error(
"pgpipe: could not create socket: error code %d",
1744 memset((
void *) &serv_addr, 0,
sizeof(serv_addr));
1745 serv_addr.sin_family = AF_INET;
1747 serv_addr.sin_addr.s_addr =
pg_hton32(INADDR_LOOPBACK);
1748 if (
bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1755 if (
listen(s, 1) == SOCKET_ERROR)
1757 pg_log_error(
"pgpipe: could not listen: error code %d",
1762 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1764 pg_log_error(
"pgpipe: getsockname() failed: error code %d",
1775 pg_log_error(
"pgpipe: could not create second socket: error code %d",
1780 handles[1] = (int) tmp_sock;
1782 if (
connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1784 pg_log_error(
"pgpipe: could not connect socket: error code %d",
1793 pg_log_error(
"pgpipe: could not accept connection: error code %d",
1800 handles[0] = (int) tmp_sock;
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static PgChecksumMode mode
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
PQExpBufferData * PQExpBuffer
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
#define accept(s, addr, addrlen)
static void set_cancel_pstate(ParallelState *pstate)
struct DumpSignalInformation DumpSignalInformation
bool IsEveryWorkerIdle(ParallelState *pstate)
void * pg_malloc(size_t size)
static int select_loop(int maxFd, fd_set *workerset)
#define pg_log_error(...)
struct WorkerInfoData * WorkerInfo
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
void PQfreeCancel(PGcancel *cancel)
static int GetIdleWorker(ParallelState *pstate)
struct ShutdownInformation ShutdownInformation
#define write_stderr(str)
static bool HasEveryWorkerTerminated(ParallelState *pstate)
static void setup_cancel_handler(void)
#define connect(s, name, namelen)
PGcancel *volatile connCancel
static void ShutdownWorkersHard(ParallelState *pstate)
void on_exit_close_archive(Archive *AHX)
#define bind(s, addr, addrlen)
SetupWorkerPtrType SetupWorkerPtr
static int fd(const char *x, int i)
ExecStatusType PQresultStatus(const PGresult *res)
#define WORKER_IGNORED_ERRORS
void DeCloneArchive(ArchiveHandle *AH)
void destroyPQExpBuffer(PQExpBuffer str)
void * pg_malloc0(size_t size)
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
static char * readMessageFromPipe(int fd)
PGcancel * PQgetCancel(PGconn *conn)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
#define select(n, r, w, e, timeout)
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
static void sendMessageToLeader(int pipefd[2], const char *str)
void * pg_realloc(void *ptr, size_t size)
void DisconnectDatabase(Archive *AHX)
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
ParallelSlot * parallelSlot
#define listen(s, backlog)
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
#define exit_nicely(code)
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
#define socket(af, type, protocol)
PQExpBuffer createPQExpBuffer(void)
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
void PQclear(PGresult *res)
void init_parallel_dump_utils(void)
pqsigfunc pqsignal(int signum, pqsigfunc handler)
static ShutdownInformation shutdown_info
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
#define Assert(condition)
ParallelCompletionPtr callback
PQExpBuffer(* getLocalPQExpBuffer)(void)
const char * fmtQualifiedId(const char *schema, const char *id)
static volatile DumpSignalInformation signal_info
T_WorkerStatus workerStatus
#define piperead(a, b, c)
#define WORKER_IS_RUNNING(workerStatus)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
static char * getMessageFromLeader(int pipefd[2])
WorkerJobRestorePtrType WorkerJobRestorePtr
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
PGresult * PQexec(PGconn *conn, const char *query)
#define pipewrite(a, b, c)
static void WaitForTerminatingWorkers(ParallelState *pstate)
static void sigTermHandler(SIGNAL_ARGS)
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
static void archive_close_connection(int code, void *arg)
void resetPQExpBuffer(PQExpBuffer str)
static void static void status(const char *fmt,...) pg_attribute_printf(1
#define messageStartsWith(msg, prefix)
WorkerJobDumpPtrType WorkerJobDumpPtr
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])