83 #define WORKER_IS_RUNNING(workerStatus) \
84 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
111 unsigned int threadId;
130 static int pgpipe(
int handles[2]);
131 #define piperead(a,b,c) recv(a,b,c,0)
132 #define pipewrite(a,b,c) send(a,b,c,0)
137 #define pgpipe(a) pipe(a)
138 #define piperead(a,b,c) read(a,b,c)
139 #define pipewrite(a,b,c) write(a,b,c)
176 static CRITICAL_SECTION signal_info_lock;
184 #define write_stderr(str) \
186 const char *str_ = (str); \
188 rc_ = write(fileno(stderr), str_, strlen(str_)); \
195 static DWORD tls_index;
198 bool parallel_init_done =
false;
219 static int select_loop(
int maxFd, fd_set *workerset);
223 int worker,
const char *
str);
226 #define messageStartsWith(msg, prefix) \
227 (strncmp(msg, prefix, strlen(prefix)) == 0)
239 if (!parallel_init_done)
245 tls_index = TlsAlloc();
246 mainThreadId = GetCurrentThreadId();
249 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
251 pg_fatal(
"%s() failed: error code %d",
"WSAStartup",
err);
253 parallel_init_done =
true;
271 if (pstate->
parallelSlot[
i].threadId == GetCurrentThreadId())
289 getThreadLocalPQExpBuffer(
void)
299 if (parallel_init_done)
302 id_return = s_id_return;
313 if (parallel_init_done)
314 TlsSetValue(tls_index, id_return);
316 s_id_return = id_return;
426 EnterCriticalSection(&signal_info_lock);
435 LeaveCriticalSection(&signal_info_lock);
456 pid_t pid = wait(&status);
462 if (slot->
pid == pid)
483 ret = WaitForMultipleObjects(nrun, lpHandles,
false, INFINITE);
484 Assert(ret != WAIT_FAILED);
485 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
492 if (slot->hThread == hThread)
495 CloseHandle((HANDLE) slot->hThread);
496 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
503 Assert(j < pstate->numWorkers);
505 pstate->
te[
j] = NULL;
634 consoleHandler(DWORD dwCtrlType)
639 if (dwCtrlType == CTRL_C_EVENT ||
640 dwCtrlType == CTRL_BREAK_EVENT)
643 EnterCriticalSection(&signal_info_lock);
661 HANDLE hThread = (HANDLE) slot->hThread;
668 if (hThread != INVALID_HANDLE_VALUE)
669 TerminateThread(hThread, 0);
682 errbuf,
sizeof(errbuf));
684 LeaveCriticalSection(&signal_info_lock);
714 InitializeCriticalSection(&signal_info_lock);
716 SetConsoleCtrlHandler(consoleHandler, TRUE);
748 EnterCriticalSection(&signal_info_lock);
756 if (oldConnCancel != NULL)
773 if (mainThreadId == GetCurrentThreadId())
778 LeaveCriticalSection(&signal_info_lock);
792 EnterCriticalSection(&signal_info_lock);
798 LeaveCriticalSection(&signal_info_lock);
812 EnterCriticalSection(&signal_info_lock);
818 LeaveCriticalSection(&signal_info_lock);
873 static unsigned __stdcall
959 pg_fatal(
"could not create communication channels: %m");
975 handle = _beginthreadex(NULL, 0, (
void *) &init_spawned_worker_win32,
976 wi, 0, &(slot->threadId));
977 slot->hThread = handle;
987 slot->
pid = getpid();
1001 for (
j = 0;
j <
i;
j++)
1016 pg_fatal(
"could not create worker process: %m");
1109 char *
buf,
int buflen)
1132 sscanf(msg,
"DUMP %d%n", &dumpId, &nBytes);
1133 Assert(nBytes == strlen(msg));
1140 sscanf(msg,
"RESTORE %d%n", &dumpId, &nBytes);
1141 Assert(nBytes == strlen(msg));
1146 pg_fatal(
"unrecognized command received from leader: \"%s\"",
1157 char *
buf,
int buflen)
1181 sscanf(msg,
"OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1184 Assert(nBytes == strlen(msg));
1189 pg_fatal(
"invalid message received from worker: \"%s\"",
1210 void *callback_data)
1228 pstate->
te[worker] = te;
1308 if (strcmp(te->
desc,
"BLOBS") == 0)
1321 pg_fatal(
"could not obtain lock on relation \"%s\"\n"
1322 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1323 "on the table after the pg_dump parent process had gotten the "
1324 "initial ACCESS SHARE lock on the table.", qualId);
1410 pg_fatal(
"a worker process died unexpectedly");
1424 pstate->
te[worker] = NULL;
1427 pg_fatal(
"invalid message received from worker: \"%s\"",
1529 int len = strlen(
str) + 1;
1532 pg_fatal(
"could not write to the communication channel: %m");
1543 fd_set saveSet = *workerset;
1547 *workerset = saveSet;
1548 i =
select(maxFd + 1, workerset, NULL, NULL, NULL);
1551 if (
i < 0 && errno ==
EINTR)
1554 if (
i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1584 struct timeval nowait = {0, 0};
1587 FD_ZERO(&workerset);
1604 if ((
i =
select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1609 pg_fatal(
"%s() failed: %m",
"select");
1646 int len = strlen(
str) + 1;
1650 pg_fatal(
"could not write to the communication channel: %m");
1690 if (msg[msgsize] ==
'\0')
1723 struct sockaddr_in serv_addr;
1724 int len =
sizeof(serv_addr);
1727 handles[0] = handles[1] = -1;
1734 pg_log_error(
"pgpipe: could not create socket: error code %d",
1739 memset(&serv_addr, 0,
sizeof(serv_addr));
1740 serv_addr.sin_family = AF_INET;
1742 serv_addr.sin_addr.s_addr =
pg_hton32(INADDR_LOOPBACK);
1743 if (
bind(s, (SOCKADDR *) &serv_addr,
len) == SOCKET_ERROR)
1750 if (
listen(s, 1) == SOCKET_ERROR)
1752 pg_log_error(
"pgpipe: could not listen: error code %d",
1757 if (getsockname(s, (SOCKADDR *) &serv_addr, &
len) == SOCKET_ERROR)
1759 pg_log_error(
"pgpipe: %s() failed: error code %d",
"getsockname",
1770 pg_log_error(
"pgpipe: could not create second socket: error code %d",
1775 handles[1] = (int) tmp_sock;
1777 if (
connect(handles[1], (SOCKADDR *) &serv_addr,
len) == SOCKET_ERROR)
1779 pg_log_error(
"pgpipe: could not connect socket: error code %d",
1788 pg_log_error(
"pgpipe: could not accept connection: error code %d",
1795 handles[0] = (int) tmp_sock;
struct WorkerInfoData * WorkerInfo
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
static void sendMessageToLeader(int pipefd[2], const char *str)
struct DumpSignalInformation DumpSignalInformation
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
static bool HasEveryWorkerTerminated(ParallelState *pstate)
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
static void sigTermHandler(SIGNAL_ARGS)
static char * readMessageFromPipe(int fd)
static void setup_cancel_handler(void)
static int select_loop(int maxFd, fd_set *workerset)
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
static int GetIdleWorker(ParallelState *pstate)
static void set_cancel_pstate(ParallelState *pstate)
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
static void archive_close_connection(int code, void *arg)
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
static ShutdownInformation shutdown_info
void on_exit_close_archive(Archive *AHX)
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
#define WORKER_IS_RUNNING(workerStatus)
static char * getMessageFromLeader(int pipefd[2])
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
#define piperead(a, b, c)
struct ShutdownInformation ShutdownInformation
#define pipewrite(a, b, c)
void init_parallel_dump_utils(void)
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
static volatile DumpSignalInformation signal_info
bool IsEveryWorkerIdle(ParallelState *pstate)
#define write_stderr(str)
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
#define messageStartsWith(msg, prefix)
static void ShutdownWorkersHard(ParallelState *pstate)
static void WaitForTerminatingWorkers(ParallelState *pstate)
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
static void PGresult * res
void err(int eval, const char *fmt,...)
PGcancel * PQgetCancel(PGconn *conn)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
void PQfreeCancel(PGcancel *cancel)
ExecStatusType PQresultStatus(const PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
void * pg_realloc(void *ptr, size_t size)
void * pg_malloc0(size_t size)
void * pg_malloc(size_t size)
if(TABLE==NULL||TABLE_index==NULL)
static void const char fflush(stdout)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
void DisconnectDatabase(Archive *AHX)
void DeCloneArchive(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
#define WORKER_IGNORED_ERRORS
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static PgChecksumMode mode
pqsigfunc pqsignal(int signo, pqsigfunc func)
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
PQExpBufferData * PQExpBuffer
static int fd(const char *x, int i)
const char * fmtQualifiedId(const char *schema, const char *id)
PQExpBuffer(* getLocalPQExpBuffer)(void)
ParallelCompletionPtr callback
T_WorkerStatus workerStatus
ParallelSlot * parallelSlot
WorkerJobDumpPtrType WorkerJobDumpPtr
PGcancel *volatile connCancel
WorkerJobRestorePtrType WorkerJobRestorePtr
SetupWorkerPtrType SetupWorkerPtr
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
#define bind(s, addr, addrlen)
#define socket(af, type, protocol)
#define accept(s, addr, addrlen)
#define connect(s, name, namelen)
#define listen(s, backlog)
#define select(n, r, w, e, timeout)