85#define WORKER_IS_RUNNING(workerStatus) \
86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
113 unsigned int threadId;
132static int pgpipe(
int handles[2]);
133#define piperead(a,b,c) recv(a,b,c,0)
134#define pipewrite(a,b,c) send(a,b,c,0)
139#define pgpipe(a) pipe(a)
140#define piperead(a,b,c) read(a,b,c)
141#define pipewrite(a,b,c) write(a,b,c)
178static CRITICAL_SECTION signal_info_lock;
186#define write_stderr(str) \
188 const char *str_ = (str); \
190 rc_ = write(fileno(stderr), str_, strlen(str_)); \
197static DWORD tls_index;
200bool parallel_init_done =
false;
221static int select_loop(
int maxFd, fd_set *workerset);
225 int worker,
const char *
str);
228#define messageStartsWith(msg, prefix) \
229 (strncmp(msg, prefix, strlen(prefix)) == 0)
241 if (!parallel_init_done)
247 tls_index = TlsAlloc();
248 mainThreadId = GetCurrentThreadId();
251 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
253 pg_fatal(
"%s() failed: error code %d",
"WSAStartup",
err);
255 parallel_init_done =
true;
273 if (pstate->
parallelSlot[
i].threadId == GetCurrentThreadId())
291getThreadLocalPQExpBuffer(
void)
301 if (parallel_init_done)
304 id_return = s_id_return;
315 if (parallel_init_done)
316 TlsSetValue(tls_index, id_return);
318 s_id_return = id_return;
438 EnterCriticalSection(&signal_info_lock);
447 LeaveCriticalSection(&signal_info_lock);
468 pid_t pid = wait(&status);
474 if (slot->
pid == pid)
495 ret = WaitForMultipleObjects(nrun, lpHandles,
false, INFINITE);
496 Assert(ret != WAIT_FAILED);
497 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
504 if (slot->hThread == hThread)
507 CloseHandle((HANDLE) slot->hThread);
508 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
515 Assert(j < pstate->numWorkers);
517 pstate->
te[
j] = NULL;
646consoleHandler(DWORD dwCtrlType)
651 if (dwCtrlType == CTRL_C_EVENT ||
652 dwCtrlType == CTRL_BREAK_EVENT)
655 EnterCriticalSection(&signal_info_lock);
673 HANDLE hThread = (HANDLE) slot->hThread;
680 if (hThread != INVALID_HANDLE_VALUE)
681 TerminateThread(hThread, 0);
694 errbuf,
sizeof(errbuf));
696 LeaveCriticalSection(&signal_info_lock);
726 InitializeCriticalSection(&signal_info_lock);
728 SetConsoleCtrlHandler(consoleHandler, TRUE);
760 EnterCriticalSection(&signal_info_lock);
768 if (oldConnCancel != NULL)
785 if (mainThreadId == GetCurrentThreadId())
790 LeaveCriticalSection(&signal_info_lock);
804 EnterCriticalSection(&signal_info_lock);
810 LeaveCriticalSection(&signal_info_lock);
824 EnterCriticalSection(&signal_info_lock);
830 LeaveCriticalSection(&signal_info_lock);
885static unsigned __stdcall
971 pg_fatal(
"could not create communication channels: %m");
987 handle = _beginthreadex(NULL, 0, (
void *) &init_spawned_worker_win32,
988 wi, 0, &(slot->threadId));
989 slot->hThread = handle;
999 slot->
pid = getpid();
1013 for (
j = 0;
j <
i;
j++)
1028 pg_fatal(
"could not create worker process: %m");
1121 char *
buf,
int buflen)
1144 sscanf(msg,
"DUMP %d%n", &dumpId, &nBytes);
1145 Assert(nBytes == strlen(msg));
1152 sscanf(msg,
"RESTORE %d%n", &dumpId, &nBytes);
1153 Assert(nBytes == strlen(msg));
1158 pg_fatal(
"unrecognized command received from leader: \"%s\"",
1169 char *
buf,
int buflen)
1193 sscanf(msg,
"OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1196 Assert(nBytes == strlen(msg));
1201 pg_fatal(
"invalid message received from worker: \"%s\"",
1222 void *callback_data)
1240 pstate->
te[worker] = te;
1320 if (strcmp(te->
desc,
"BLOBS") == 0)
1333 pg_fatal(
"could not obtain lock on relation \"%s\"\n"
1334 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1335 "on the table after the pg_dump parent process had gotten the "
1336 "initial ACCESS SHARE lock on the table.", qualId);
1422 pg_fatal(
"a worker process died unexpectedly");
1436 pstate->
te[worker] = NULL;
1439 pg_fatal(
"invalid message received from worker: \"%s\"",
1541 int len = strlen(
str) + 1;
1544 pg_fatal(
"could not write to the communication channel: %m");
1555 fd_set saveSet = *workerset;
1559 *workerset = saveSet;
1560 i =
select(maxFd + 1, workerset, NULL, NULL, NULL);
1563 if (
i < 0 && errno ==
EINTR)
1566 if (
i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1596 struct timeval nowait = {0, 0};
1599 FD_ZERO(&workerset);
1616 if ((
i =
select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1621 pg_fatal(
"%s() failed: %m",
"select");
1658 int len = strlen(
str) + 1;
1662 pg_fatal(
"could not write to the communication channel: %m");
1702 if (msg[msgsize] ==
'\0')
1735 struct sockaddr_in serv_addr;
1736 int len =
sizeof(serv_addr);
1739 handles[0] = handles[1] = -1;
1746 pg_log_error(
"pgpipe: could not create socket: error code %d",
1751 memset(&serv_addr, 0,
sizeof(serv_addr));
1752 serv_addr.sin_family = AF_INET;
1754 serv_addr.sin_addr.s_addr =
pg_hton32(INADDR_LOOPBACK);
1755 if (
bind(s, (SOCKADDR *) &serv_addr,
len) == SOCKET_ERROR)
1762 if (
listen(s, 1) == SOCKET_ERROR)
1764 pg_log_error(
"pgpipe: could not listen: error code %d",
1769 if (getsockname(s, (SOCKADDR *) &serv_addr, &
len) == SOCKET_ERROR)
1771 pg_log_error(
"pgpipe: %s() failed: error code %d",
"getsockname",
1782 pg_log_error(
"pgpipe: could not create second socket: error code %d",
1787 handles[1] = (int) tmp_sock;
1789 if (
connect(handles[1], (SOCKADDR *) &serv_addr,
len) == SOCKET_ERROR)
1791 pg_log_error(
"pgpipe: could not connect socket: error code %d",
1800 pg_log_error(
"pgpipe: could not accept connection: error code %d",
1807 handles[0] = (int) tmp_sock;
struct WorkerInfoData * WorkerInfo
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
static void sendMessageToLeader(int pipefd[2], const char *str)
struct DumpSignalInformation DumpSignalInformation
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
static bool HasEveryWorkerTerminated(ParallelState *pstate)
void replace_on_exit_close_archive(Archive *AHX)
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
static void sigTermHandler(SIGNAL_ARGS)
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
static char * readMessageFromPipe(int fd)
static int select_loop(int maxFd, fd_set *workerset)
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
static int GetIdleWorker(ParallelState *pstate)
static void set_cancel_pstate(ParallelState *pstate)
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
static void archive_close_connection(int code, void *arg)
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
static ShutdownInformation shutdown_info
void on_exit_close_archive(Archive *AHX)
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
#define WORKER_IS_RUNNING(workerStatus)
static char * getMessageFromLeader(int pipefd[2])
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
#define piperead(a, b, c)
struct ShutdownInformation ShutdownInformation
#define pipewrite(a, b, c)
void init_parallel_dump_utils(void)
static void set_cancel_handler(void)
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
static volatile DumpSignalInformation signal_info
bool IsEveryWorkerIdle(ParallelState *pstate)
#define write_stderr(str)
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
#define messageStartsWith(msg, prefix)
static void ShutdownWorkersHard(ParallelState *pstate)
static void WaitForTerminatingWorkers(ParallelState *pstate)
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
void err(int eval, const char *fmt,...)
PGcancel * PQgetCancel(PGconn *conn)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
void PQfreeCancel(PGcancel *cancel)
ExecStatusType PQresultStatus(const PGresult *res)
void PQclear(PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
void * pg_malloc(size_t size)
void * pg_malloc0(size_t size)
void * pg_realloc(void *ptr, size_t size)
Assert(PointerIsAligned(start, uint64))
if(TABLE==NULL||TABLE_index==NULL)
#define pg_log_error(...)
void DisconnectDatabase(Archive *AHX)
void DeCloneArchive(ArchiveHandle *AH)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
#define WORKER_IGNORED_ERRORS
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static PgChecksumMode mode
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
PQExpBufferData * PQExpBuffer
static int fd(const char *x, int i)
const char * fmtQualifiedId(const char *schema, const char *id)
PQExpBuffer(* getLocalPQExpBuffer)(void)
ParallelCompletionPtr callback
T_WorkerStatus workerStatus
ParallelSlot * parallelSlot
WorkerJobDumpPtrType WorkerJobDumpPtr
PGcancel *volatile connCancel
WorkerJobRestorePtrType WorkerJobRestorePtr
SetupWorkerPtrType SetupWorkerPtr
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
#define bind(s, addr, addrlen)
#define socket(af, type, protocol)
#define accept(s, addr, addrlen)
#define connect(s, name, namelen)
#define listen(s, backlog)
#define select(n, r, w, e, timeout)