PostgreSQL Source Code  git master
parallel.c
Go to the documentation of this file.
1 /*
2  * parallel.c
3  *
4  * multi-process support
5  *
6  * Copyright (c) 2010-2018, PostgreSQL Global Development Group
7  * src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include <sys/wait.h>
13 #ifdef WIN32
14 #include <io.h>
15 #endif
16 
17 #include "pg_upgrade.h"
18 
19 
20 static int parallel_jobs;
21 
22 #ifdef WIN32
23 /*
24  * Array holding all active threads. There can't be any gaps/zeros so
25  * it can be passed to WaitForMultipleObjects(). We use two arrays
26  * so the thread_handles array can be passed to WaitForMultipleObjects().
27  */
28 HANDLE *thread_handles;
29 
30 typedef struct
31 {
32  char *log_file;
33  char *opt_log_file;
34  char *cmd;
35 } exec_thread_arg;
36 
37 typedef struct
38 {
39  DbInfoArr *old_db_arr;
40  DbInfoArr *new_db_arr;
41  char *old_pgdata;
42  char *new_pgdata;
43  char *old_tablespace;
44 } transfer_thread_arg;
45 
46 exec_thread_arg **exec_thread_args;
47 transfer_thread_arg **transfer_thread_args;
48 
49 /* track current thread_args struct so reap_child() can be used for all cases */
50 void **cur_thread_args;
51 
52 DWORD win32_exec_prog(exec_thread_arg *args);
53 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
54 #endif
55 
56 /*
57  * parallel_exec_prog
58  *
59  * This has the same API as exec_prog, except it does parallel execution,
60  * and therefore must throw errors and doesn't return an error status.
61  */
62 void
63 parallel_exec_prog(const char *log_file, const char *opt_log_file,
64  const char *fmt,...)
65 {
66  va_list args;
67  char cmd[MAX_STRING];
68 
69 #ifndef WIN32
70  pid_t child;
71 #else
72  HANDLE child;
73  exec_thread_arg *new_arg;
74 #endif
75 
76  va_start(args, fmt);
77  vsnprintf(cmd, sizeof(cmd), fmt, args);
78  va_end(args);
79 
80  if (user_opts.jobs <= 1)
81  /* exit_on_error must be true to allow jobs */
82  exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
83  else
84  {
85  /* parallel */
86 #ifdef WIN32
87  if (thread_handles == NULL)
88  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
89 
90  if (exec_thread_args == NULL)
91  {
92  int i;
93 
94  exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
95 
96  /*
97  * For safety and performance, we keep the args allocated during
98  * the entire life of the process, and we don't free the args in a
99  * thread different from the one that allocated it.
100  */
101  for (i = 0; i < user_opts.jobs; i++)
102  exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
103  }
104 
105  cur_thread_args = (void **) exec_thread_args;
106 #endif
107  /* harvest any dead children */
108  while (reap_child(false) == true)
109  ;
110 
111  /* must we wait for a dead child? */
113  reap_child(true);
114 
115  /* set this before we start the job */
116  parallel_jobs++;
117 
118  /* Ensure stdio state is quiesced before forking */
119  fflush(NULL);
120 
121 #ifndef WIN32
122  child = fork();
123  if (child == 0)
124  /* use _exit to skip atexit() functions */
125  _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
126  else if (child < 0)
127  /* fork failed */
128  pg_fatal("could not create worker process: %s\n", strerror(errno));
129 #else
130  /* empty array element are always at the end */
131  new_arg = exec_thread_args[parallel_jobs - 1];
132 
133  /* Can only pass one pointer into the function, so use a struct */
134  if (new_arg->log_file)
135  pg_free(new_arg->log_file);
136  new_arg->log_file = pg_strdup(log_file);
137  if (new_arg->opt_log_file)
138  pg_free(new_arg->opt_log_file);
139  new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
140  if (new_arg->cmd)
141  pg_free(new_arg->cmd);
142  new_arg->cmd = pg_strdup(cmd);
143 
144  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
145  new_arg, 0, NULL);
146  if (child == 0)
147  pg_fatal("could not create worker thread: %s\n", strerror(errno));
148 
149  thread_handles[parallel_jobs - 1] = child;
150 #endif
151  }
152 
153  return;
154 }
155 
156 
157 #ifdef WIN32
158 DWORD
159 win32_exec_prog(exec_thread_arg *args)
160 {
161  int ret;
162 
163  ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
164 
165  /* terminates thread */
166  return ret;
167 }
168 #endif
169 
170 
171 /*
172  * parallel_transfer_all_new_dbs
173  *
174  * This has the same API as transfer_all_new_dbs, except it does parallel execution
175  * by transferring multiple tablespaces in parallel
176  */
177 void
179  char *old_pgdata, char *new_pgdata,
180  char *old_tablespace)
181 {
182 #ifndef WIN32
183  pid_t child;
184 #else
185  HANDLE child;
186  transfer_thread_arg *new_arg;
187 #endif
188 
189  if (user_opts.jobs <= 1)
190  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
191  else
192  {
193  /* parallel */
194 #ifdef WIN32
195  if (thread_handles == NULL)
196  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
197 
198  if (transfer_thread_args == NULL)
199  {
200  int i;
201 
202  transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
203 
204  /*
205  * For safety and performance, we keep the args allocated during
206  * the entire life of the process, and we don't free the args in a
207  * thread different from the one that allocated it.
208  */
209  for (i = 0; i < user_opts.jobs; i++)
210  transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
211  }
212 
213  cur_thread_args = (void **) transfer_thread_args;
214 #endif
215  /* harvest any dead children */
216  while (reap_child(false) == true)
217  ;
218 
219  /* must we wait for a dead child? */
221  reap_child(true);
222 
223  /* set this before we start the job */
224  parallel_jobs++;
225 
226  /* Ensure stdio state is quiesced before forking */
227  fflush(NULL);
228 
229 #ifndef WIN32
230  child = fork();
231  if (child == 0)
232  {
233  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
234  old_tablespace);
235  /* if we take another exit path, it will be non-zero */
236  /* use _exit to skip atexit() functions */
237  _exit(0);
238  }
239  else if (child < 0)
240  /* fork failed */
241  pg_fatal("could not create worker process: %s\n", strerror(errno));
242 #else
243  /* empty array element are always at the end */
244  new_arg = transfer_thread_args[parallel_jobs - 1];
245 
246  /* Can only pass one pointer into the function, so use a struct */
247  new_arg->old_db_arr = old_db_arr;
248  new_arg->new_db_arr = new_db_arr;
249  if (new_arg->old_pgdata)
250  pg_free(new_arg->old_pgdata);
251  new_arg->old_pgdata = pg_strdup(old_pgdata);
252  if (new_arg->new_pgdata)
253  pg_free(new_arg->new_pgdata);
254  new_arg->new_pgdata = pg_strdup(new_pgdata);
255  if (new_arg->old_tablespace)
256  pg_free(new_arg->old_tablespace);
257  new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
258 
259  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
260  new_arg, 0, NULL);
261  if (child == 0)
262  pg_fatal("could not create worker thread: %s\n", strerror(errno));
263 
264  thread_handles[parallel_jobs - 1] = child;
265 #endif
266  }
267 
268  return;
269 }
270 
271 
272 #ifdef WIN32
273 DWORD
274 win32_transfer_all_new_dbs(transfer_thread_arg *args)
275 {
276  transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
277  args->new_pgdata, args->old_tablespace);
278 
279  /* terminates thread */
280  return 0;
281 }
282 #endif
283 
284 
285 /*
286  * collect status from a completed worker child
287  */
288 bool
289 reap_child(bool wait_for_child)
290 {
291 #ifndef WIN32
292  int work_status;
293  int ret;
294 #else
295  int thread_num;
296  DWORD res;
297 #endif
298 
299  if (user_opts.jobs <= 1 || parallel_jobs == 0)
300  return false;
301 
302 #ifndef WIN32
303  ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
304 
305  /* no children or, for WNOHANG, no dead children */
306  if (ret <= 0 || !WIFEXITED(work_status))
307  return false;
308 
309  if (WEXITSTATUS(work_status) != 0)
310  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
311 #else
312  /* wait for one to finish */
313  thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
314  false, wait_for_child ? INFINITE : 0);
315 
316  if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
317  return false;
318 
319  /* compute thread index in active_threads */
320  thread_num -= WAIT_OBJECT_0;
321 
322  /* get the result */
323  GetExitCodeThread(thread_handles[thread_num], &res);
324  if (res != 0)
325  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
326 
327  /* dispose of handle to stop leaks */
328  CloseHandle(thread_handles[thread_num]);
329 
330  /* Move last slot into dead child's position */
331  if (thread_num != parallel_jobs - 1)
332  {
333  void *tmp_args;
334 
335  thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
336 
337  /*
338  * Move last active thead arg struct into the now-dead slot, and the
339  * now-dead slot to the end for reuse by the next thread. Though the
340  * thread struct is in use by another thread, we can safely swap the
341  * struct pointers within the array.
342  */
343  tmp_args = cur_thread_args[thread_num];
344  cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
345  cur_thread_args[parallel_jobs - 1] = tmp_args;
346  }
347 #endif
348 
349  /* do this after job has been removed */
350  parallel_jobs--;
351 
352  return true;
353 }
void parallel_exec_prog(const char *log_file, const char *opt_log_file, const char *fmt,...)
Definition: parallel.c:63
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenode.c:81
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static char * log_file
Definition: pg_ctl.c:88
void pg_fatal(const char *fmt,...)
Definition: logging.c:83
int jobs
Definition: pg_upgrade.h:300
int int vsnprintf(char *str, size_t count, const char *fmt, va_list args)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: parallel.c:178
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:29
#define MAX_STRING
Definition: pg_upgrade.h:21
#define WIFEXITED(w)
Definition: win32_port.h:147
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:289
int i
const char * strerror(int errnum)
Definition: strerror.c:19
bool exec_prog(const char *log_file, const char *opt_log_file, bool report_error, bool exit_on_error, const char *fmt,...)
Definition: exec.c:80
static int parallel_jobs
Definition: parallel.c:20
#define WEXITSTATUS(w)
Definition: win32_port.h:149