85 #define WORKER_IS_RUNNING(workerStatus) \
86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
113 unsigned int threadId;
132 static int pgpipe(
int handles[2]);
133 #define piperead(a,b,c) recv(a,b,c,0)
134 #define pipewrite(a,b,c) send(a,b,c,0)
139 #define pgpipe(a) pipe(a)
140 #define piperead(a,b,c) read(a,b,c)
141 #define pipewrite(a,b,c) write(a,b,c)
178 static CRITICAL_SECTION signal_info_lock;
186 #define write_stderr(str) \
188 const char *str_ = (str); \
190 rc_ = write(fileno(stderr), str_, strlen(str_)); \
197 static DWORD tls_index;
200 bool parallel_init_done =
false;
221 static int select_loop(
int maxFd, fd_set *workerset);
225 int worker,
const char *
str);
228 #define messageStartsWith(msg, prefix) \
229 (strncmp(msg, prefix, strlen(prefix)) == 0)
241 if (!parallel_init_done)
247 tls_index = TlsAlloc();
248 mainThreadId = GetCurrentThreadId();
251 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
253 pg_fatal(
"%s() failed: error code %d",
"WSAStartup",
err);
255 parallel_init_done =
true;
273 if (pstate->
parallelSlot[
i].threadId == GetCurrentThreadId())
291 getThreadLocalPQExpBuffer(
void)
301 if (parallel_init_done)
304 id_return = s_id_return;
315 if (parallel_init_done)
316 TlsSetValue(tls_index, id_return);
318 s_id_return = id_return;
428 EnterCriticalSection(&signal_info_lock);
437 LeaveCriticalSection(&signal_info_lock);
458 pid_t pid = wait(&status);
464 if (slot->
pid == pid)
485 ret = WaitForMultipleObjects(nrun, lpHandles,
false, INFINITE);
486 Assert(ret != WAIT_FAILED);
487 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
494 if (slot->hThread == hThread)
497 CloseHandle((HANDLE) slot->hThread);
498 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
505 Assert(j < pstate->numWorkers);
507 pstate->
te[
j] = NULL;
636 consoleHandler(DWORD dwCtrlType)
641 if (dwCtrlType == CTRL_C_EVENT ||
642 dwCtrlType == CTRL_BREAK_EVENT)
645 EnterCriticalSection(&signal_info_lock);
663 HANDLE hThread = (HANDLE) slot->hThread;
670 if (hThread != INVALID_HANDLE_VALUE)
671 TerminateThread(hThread, 0);
684 errbuf,
sizeof(errbuf));
686 LeaveCriticalSection(&signal_info_lock);
716 InitializeCriticalSection(&signal_info_lock);
718 SetConsoleCtrlHandler(consoleHandler, TRUE);
750 EnterCriticalSection(&signal_info_lock);
758 if (oldConnCancel != NULL)
775 if (mainThreadId == GetCurrentThreadId())
780 LeaveCriticalSection(&signal_info_lock);
794 EnterCriticalSection(&signal_info_lock);
800 LeaveCriticalSection(&signal_info_lock);
814 EnterCriticalSection(&signal_info_lock);
820 LeaveCriticalSection(&signal_info_lock);
875 static unsigned __stdcall
961 pg_fatal(
"could not create communication channels: %m");
977 handle = _beginthreadex(NULL, 0, (
void *) &init_spawned_worker_win32,
978 wi, 0, &(slot->threadId));
979 slot->hThread = handle;
989 slot->
pid = getpid();
1003 for (
j = 0;
j <
i;
j++)
1018 pg_fatal(
"could not create worker process: %m");
1111 char *
buf,
int buflen)
1134 sscanf(msg,
"DUMP %d%n", &dumpId, &nBytes);
1135 Assert(nBytes == strlen(msg));
1142 sscanf(msg,
"RESTORE %d%n", &dumpId, &nBytes);
1143 Assert(nBytes == strlen(msg));
1148 pg_fatal(
"unrecognized command received from leader: \"%s\"",
1159 char *
buf,
int buflen)
1183 sscanf(msg,
"OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1186 Assert(nBytes == strlen(msg));
1191 pg_fatal(
"invalid message received from worker: \"%s\"",
1212 void *callback_data)
1230 pstate->
te[worker] = te;
1310 if (strcmp(te->
desc,
"BLOBS") == 0)
1323 pg_fatal(
"could not obtain lock on relation \"%s\"\n"
1324 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1325 "on the table after the pg_dump parent process had gotten the "
1326 "initial ACCESS SHARE lock on the table.", qualId);
1412 pg_fatal(
"a worker process died unexpectedly");
1426 pstate->
te[worker] = NULL;
1429 pg_fatal(
"invalid message received from worker: \"%s\"",
1531 int len = strlen(
str) + 1;
1534 pg_fatal(
"could not write to the communication channel: %m");
1545 fd_set saveSet = *workerset;
1549 *workerset = saveSet;
1550 i =
select(maxFd + 1, workerset, NULL, NULL, NULL);
1553 if (
i < 0 && errno ==
EINTR)
1556 if (
i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1586 struct timeval nowait = {0, 0};
1589 FD_ZERO(&workerset);
1606 if ((
i =
select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1611 pg_fatal(
"%s() failed: %m",
"select");
1648 int len = strlen(
str) + 1;
1652 pg_fatal(
"could not write to the communication channel: %m");
1692 if (msg[msgsize] ==
'\0')
1725 struct sockaddr_in serv_addr;
1726 int len =
sizeof(serv_addr);
1729 handles[0] = handles[1] = -1;
1736 pg_log_error(
"pgpipe: could not create socket: error code %d",
1741 memset(&serv_addr, 0,
sizeof(serv_addr));
1742 serv_addr.sin_family = AF_INET;
1744 serv_addr.sin_addr.s_addr =
pg_hton32(INADDR_LOOPBACK);
1745 if (
bind(s, (SOCKADDR *) &serv_addr,
len) == SOCKET_ERROR)
1752 if (
listen(s, 1) == SOCKET_ERROR)
1754 pg_log_error(
"pgpipe: could not listen: error code %d",
1759 if (getsockname(s, (SOCKADDR *) &serv_addr, &
len) == SOCKET_ERROR)
1761 pg_log_error(
"pgpipe: %s() failed: error code %d",
"getsockname",
1772 pg_log_error(
"pgpipe: could not create second socket: error code %d",
1777 handles[1] = (int) tmp_sock;
1779 if (
connect(handles[1], (SOCKADDR *) &serv_addr,
len) == SOCKET_ERROR)
1781 pg_log_error(
"pgpipe: could not connect socket: error code %d",
1790 pg_log_error(
"pgpipe: could not accept connection: error code %d",
1797 handles[0] = (int) tmp_sock;
struct WorkerInfoData * WorkerInfo
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
static void sendMessageToLeader(int pipefd[2], const char *str)
struct DumpSignalInformation DumpSignalInformation
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
static bool HasEveryWorkerTerminated(ParallelState *pstate)
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
static void sigTermHandler(SIGNAL_ARGS)
static char * readMessageFromPipe(int fd)
static int select_loop(int maxFd, fd_set *workerset)
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
static int GetIdleWorker(ParallelState *pstate)
static void set_cancel_pstate(ParallelState *pstate)
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
static void archive_close_connection(int code, void *arg)
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
static ShutdownInformation shutdown_info
void on_exit_close_archive(Archive *AHX)
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
#define WORKER_IS_RUNNING(workerStatus)
static char * getMessageFromLeader(int pipefd[2])
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
#define piperead(a, b, c)
struct ShutdownInformation ShutdownInformation
#define pipewrite(a, b, c)
void init_parallel_dump_utils(void)
static void set_cancel_handler(void)
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
static volatile DumpSignalInformation signal_info
bool IsEveryWorkerIdle(ParallelState *pstate)
#define write_stderr(str)
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
#define messageStartsWith(msg, prefix)
static void ShutdownWorkersHard(ParallelState *pstate)
static void WaitForTerminatingWorkers(ParallelState *pstate)
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
#define Assert(condition)
static void PGresult * res
void err(int eval, const char *fmt,...)
PGcancel * PQgetCancel(PGconn *conn)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
void PQfreeCancel(PGcancel *cancel)
ExecStatusType PQresultStatus(const PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
void * pg_realloc(void *ptr, size_t size)
void * pg_malloc0(size_t size)
void * pg_malloc(size_t size)
if(TABLE==NULL||TABLE_index==NULL)
static void const char fflush(stdout)
#define pg_log_error(...)
void DisconnectDatabase(Archive *AHX)
void DeCloneArchive(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
#define WORKER_IGNORED_ERRORS
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static PgChecksumMode mode
pqsigfunc pqsignal(int signo, pqsigfunc func)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
PQExpBufferData * PQExpBuffer
static int fd(const char *x, int i)
const char * fmtQualifiedId(const char *schema, const char *id)
PQExpBuffer(* getLocalPQExpBuffer)(void)
ParallelCompletionPtr callback
T_WorkerStatus workerStatus
ParallelSlot * parallelSlot
WorkerJobDumpPtrType WorkerJobDumpPtr
PGcancel *volatile connCancel
WorkerJobRestorePtrType WorkerJobRestorePtr
SetupWorkerPtrType SetupWorkerPtr
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
#define bind(s, addr, addrlen)
#define socket(af, type, protocol)
#define accept(s, addr, addrlen)
#define connect(s, name, namelen)
#define listen(s, backlog)
#define select(n, r, w, e, timeout)