85#define WORKER_IS_RUNNING(workerStatus) \
86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
133#define piperead(a,b,c) recv(a,b,c,0)
134#define pipewrite(a,b,c) send(a,b,c,0)
139#define pgpipe(a) pipe(a)
140#define piperead(a,b,c) read(a,b,c)
141#define pipewrite(a,b,c) write(a,b,c)
186#define write_stderr(str) \
188 const char *str_ = (str); \
190 rc_ = write(fileno(stderr), str_, strlen(str_)); \
225 int worker,
const char *
str);
228#define messageStartsWith(msg, prefix) \
229 (strncmp(msg, prefix, strlen(prefix)) == 0)
253 pg_fatal(
"%s() failed: error code %d",
"WSAStartup",
err);
458 pid_t pid = wait(&status);
464 if (slot->
pid == pid)
673 if (AH !=
NULL && AH->connCancel !=
NULL)
674 (
void)
PQcancel(AH->connCancel, errbuf,
sizeof(errbuf));
684 errbuf,
sizeof(errbuf));
961 pg_fatal(
"could not create communication channels: %m");
978 wi, 0, &(slot->threadId));
979 slot->hThread = handle;
1003 for (
j = 0;
j <
i;
j++)
1018 pg_fatal(
"could not create worker process: %m");
1111 char *
buf,
int buflen)
1148 pg_fatal(
"unrecognized command received from leader: \"%s\"",
1159 char *
buf,
int buflen)
1183 sscanf(msg,
"OK %d %d %d%n", &dumpId, &status, &n_errors, &
nBytes);
1191 pg_fatal(
"invalid message received from worker: \"%s\"",
1212 void *callback_data)
1230 pstate->
te[worker] = te;
1323 pg_fatal(
"could not obtain lock on relation \"%s\"\n"
1324 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1325 "on the table after the pg_dump parent process had gotten the "
1326 "initial ACCESS SHARE lock on the table.",
qualId);
1412 pg_fatal(
"a worker process died unexpectedly");
1426 pstate->
te[worker] =
NULL;
1429 pg_fatal(
"invalid message received from worker: \"%s\"",
1534 pg_fatal(
"could not write to the communication channel: %m");
1586 struct timeval nowait = {0, 0};
1611 pg_fatal(
"%s() failed: %m",
"select");
1652 pg_fatal(
"could not write to the communication channel: %m");
1736 pg_log_error(
"pgpipe: could not create socket: error code %d",
1754 pg_log_error(
"pgpipe: could not listen: error code %d",
1761 pg_log_error(
"pgpipe: %s() failed: error code %d",
"getsockname",
1772 pg_log_error(
"pgpipe: could not create second socket: error code %d",
1781 pg_log_error(
"pgpipe: could not connect socket: error code %d",
1790 pg_log_error(
"pgpipe: could not accept connection: error code %d",
struct WorkerInfoData * WorkerInfo
void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
static void sendMessageToLeader(int pipefd[2], const char *str)
static ParallelSlot * GetMyPSlot(ParallelState *pstate)
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2])
void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
static bool HasEveryWorkerTerminated(ParallelState *pstate)
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
static void sigTermHandler(SIGNAL_ARGS)
ParallelState * ParallelBackupStart(ArchiveHandle *AH)
static char * readMessageFromPipe(int fd)
static int select_loop(int maxFd, fd_set *workerset)
static int parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, const char *msg)
static int GetIdleWorker(ParallelState *pstate)
static void set_cancel_pstate(ParallelState *pstate)
static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
static void buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, char *buf, int buflen)
static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
static void archive_close_connection(int code, void *arg)
static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
static ShutdownInformation shutdown_info
void on_exit_close_archive(Archive *AHX)
void DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te, T_Action act, ParallelCompletionPtr callback, void *callback_data)
#define WORKER_IS_RUNNING(workerStatus)
static char * getMessageFromLeader(int pipefd[2])
static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
#define piperead(a, b, c)
#define pipewrite(a, b, c)
void init_parallel_dump_utils(void)
static void set_cancel_handler(void)
static void buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, char *buf, int buflen)
static volatile DumpSignalInformation signal_info
bool IsEveryWorkerIdle(ParallelState *pstate)
#define write_stderr(str)
static void parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, const char *msg)
#define messageStartsWith(msg, prefix)
static void ShutdownWorkersHard(ParallelState *pstate)
static void WaitForTerminatingWorkers(ParallelState *pstate)
void set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
void(* ParallelCompletionPtr)(ArchiveHandle *AH, TocEntry *te, int status, void *callback_data)
#define Assert(condition)
void err(int eval, const char *fmt,...)
PGcancel * PQgetCancel(PGconn *conn)
int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize)
void PQfreeCancel(PGcancel *cancel)
PGresult * PQexec(PGconn *conn, const char *query)
void * pg_malloc(size_t size)
void * pg_malloc0(size_t size)
void * pg_realloc(void *ptr, size_t size)
#define pg_log_error(...)
void DisconnectDatabase(Archive *AHX)
void DeCloneArchive(ArchiveHandle *AH)
ArchiveHandle * CloneArchive(ArchiveHandle *AH)
TocEntry * getTocEntryByDumpId(ArchiveHandle *AH, DumpId id)
#define WORKER_IGNORED_ERRORS
void on_exit_nicely(on_exit_nicely_callback function, void *arg)
static PgChecksumMode mode
static char buf[DEFAULT_XLOG_SEG_SIZE]
PQExpBuffer createPQExpBuffer(void)
void resetPQExpBuffer(PQExpBuffer str)
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
PQExpBufferData * PQExpBuffer
static int fd(const char *x, int i)
const char * fmtQualifiedId(const char *schema, const char *id)
PQExpBuffer(* getLocalPQExpBuffer)(void)
ParallelCompletionPtr callback
T_WorkerStatus workerStatus
ParallelSlot * parallelSlot
WorkerJobDumpPtrType WorkerJobDumpPtr
PGcancel *volatile connCancel
WorkerJobRestorePtrType WorkerJobRestorePtr
SetupWorkerPtrType SetupWorkerPtr
static void callback(struct sockaddr *addr, struct sockaddr *mask, void *unused)
#define bind(s, addr, addrlen)
#define socket(af, type, protocol)
#define accept(s, addr, addrlen)
#define connect(s, name, namelen)
#define listen(s, backlog)
#define select(n, r, w, e, timeout)