PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
parallel.c
Go to the documentation of this file.
1 /*
2  * parallel.c
3  *
4  * multi-process support
5  *
6  * Copyright (c) 2010-2017, PostgreSQL Global Development Group
7  * src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include <sys/wait.h>
13 #ifdef WIN32
14 #include <io.h>
15 #endif
16 
17 #include "pg_upgrade.h"
18 
19 
20 static int parallel_jobs;
21 
22 #ifdef WIN32
23 /*
24  * Array holding all active threads. There can't be any gaps/zeros so
25  * it can be passed to WaitForMultipleObjects(). We use two arrays
26  * so the thread_handles array can be passed to WaitForMultipleObjects().
27  */
28 HANDLE *thread_handles;
29 
30 typedef struct
31 {
32  char *log_file;
33  char *opt_log_file;
34  char *cmd;
35 } exec_thread_arg;
36 
37 typedef struct
38 {
39  DbInfoArr *old_db_arr;
40  DbInfoArr *new_db_arr;
41  char *old_pgdata;
42  char *new_pgdata;
43  char *old_tablespace;
44 } transfer_thread_arg;
45 
46 exec_thread_arg **exec_thread_args;
47 transfer_thread_arg **transfer_thread_args;
48 
49 /* track current thread_args struct so reap_child() can be used for all cases */
50 void **cur_thread_args;
51 
52 DWORD win32_exec_prog(exec_thread_arg *args);
53 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
54 #endif
55 
56 /*
57  * parallel_exec_prog
58  *
59  * This has the same API as exec_prog, except it does parallel execution,
60  * and therefore must throw errors and doesn't return an error status.
61  */
62 void
63 parallel_exec_prog(const char *log_file, const char *opt_log_file,
64  const char *fmt,...)
65 {
66  va_list args;
67  char cmd[MAX_STRING];
68 
69 #ifndef WIN32
70  pid_t child;
71 #else
72  HANDLE child;
73  exec_thread_arg *new_arg;
74 #endif
75 
76  va_start(args, fmt);
77  vsnprintf(cmd, sizeof(cmd), fmt, args);
78  va_end(args);
79 
80  if (user_opts.jobs <= 1)
81  /* throw_error must be true to allow jobs */
82  exec_prog(log_file, opt_log_file, true, "%s", cmd);
83  else
84  {
85  /* parallel */
86 #ifdef WIN32
87  if (thread_handles == NULL)
88  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
89 
90  if (exec_thread_args == NULL)
91  {
92  int i;
93 
94  exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
95 
96  /*
97  * For safety and performance, we keep the args allocated during
98  * the entire life of the process, and we don't free the args in a
99  * thread different from the one that allocated it.
100  */
101  for (i = 0; i < user_opts.jobs; i++)
102  exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
103  }
104 
105  cur_thread_args = (void **) exec_thread_args;
106 #endif
107  /* harvest any dead children */
108  while (reap_child(false) == true)
109  ;
110 
111  /* must we wait for a dead child? */
113  reap_child(true);
114 
115  /* set this before we start the job */
116  parallel_jobs++;
117 
118  /* Ensure stdio state is quiesced before forking */
119  fflush(NULL);
120 
121 #ifndef WIN32
122  child = fork();
123  if (child == 0)
124  /* use _exit to skip atexit() functions */
125  _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
126  else if (child < 0)
127  /* fork failed */
128  pg_fatal("could not create worker process: %s\n", strerror(errno));
129 #else
130  /* empty array element are always at the end */
131  new_arg = exec_thread_args[parallel_jobs - 1];
132 
133  /* Can only pass one pointer into the function, so use a struct */
134  if (new_arg->log_file)
135  pg_free(new_arg->log_file);
136  new_arg->log_file = pg_strdup(log_file);
137  if (new_arg->opt_log_file)
138  pg_free(new_arg->opt_log_file);
139  new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
140  if (new_arg->cmd)
141  pg_free(new_arg->cmd);
142  new_arg->cmd = pg_strdup(cmd);
143 
144  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
145  new_arg, 0, NULL);
146  if (child == 0)
147  pg_fatal("could not create worker thread: %s\n", strerror(errno));
148 
149  thread_handles[parallel_jobs - 1] = child;
150 #endif
151  }
152 
153  return;
154 }
155 
156 
157 #ifdef WIN32
158 DWORD
159 win32_exec_prog(exec_thread_arg *args)
160 {
161  int ret;
162 
163  ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
164 
165  /* terminates thread */
166  return ret;
167 }
168 #endif
169 
170 
171 /*
172  * parallel_transfer_all_new_dbs
173  *
174  * This has the same API as transfer_all_new_dbs, except it does parallel execution
175  * by transferring multiple tablespaces in parallel
176  */
177 void
179  char *old_pgdata, char *new_pgdata,
180  char *old_tablespace)
181 {
182 #ifndef WIN32
183  pid_t child;
184 #else
185  HANDLE child;
186  transfer_thread_arg *new_arg;
187 #endif
188 
189  if (user_opts.jobs <= 1)
190  /* throw_error must be true to allow jobs */
191  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
192  else
193  {
194  /* parallel */
195 #ifdef WIN32
196  if (thread_handles == NULL)
197  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
198 
199  if (transfer_thread_args == NULL)
200  {
201  int i;
202 
203  transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
204 
205  /*
206  * For safety and performance, we keep the args allocated during
207  * the entire life of the process, and we don't free the args in a
208  * thread different from the one that allocated it.
209  */
210  for (i = 0; i < user_opts.jobs; i++)
211  transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
212  }
213 
214  cur_thread_args = (void **) transfer_thread_args;
215 #endif
216  /* harvest any dead children */
217  while (reap_child(false) == true)
218  ;
219 
220  /* must we wait for a dead child? */
222  reap_child(true);
223 
224  /* set this before we start the job */
225  parallel_jobs++;
226 
227  /* Ensure stdio state is quiesced before forking */
228  fflush(NULL);
229 
230 #ifndef WIN32
231  child = fork();
232  if (child == 0)
233  {
234  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
235  old_tablespace);
236  /* if we take another exit path, it will be non-zero */
237  /* use _exit to skip atexit() functions */
238  _exit(0);
239  }
240  else if (child < 0)
241  /* fork failed */
242  pg_fatal("could not create worker process: %s\n", strerror(errno));
243 #else
244  /* empty array element are always at the end */
245  new_arg = transfer_thread_args[parallel_jobs - 1];
246 
247  /* Can only pass one pointer into the function, so use a struct */
248  new_arg->old_db_arr = old_db_arr;
249  new_arg->new_db_arr = new_db_arr;
250  if (new_arg->old_pgdata)
251  pg_free(new_arg->old_pgdata);
252  new_arg->old_pgdata = pg_strdup(old_pgdata);
253  if (new_arg->new_pgdata)
254  pg_free(new_arg->new_pgdata);
255  new_arg->new_pgdata = pg_strdup(new_pgdata);
256  if (new_arg->old_tablespace)
257  pg_free(new_arg->old_tablespace);
258  new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
259 
260  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
261  new_arg, 0, NULL);
262  if (child == 0)
263  pg_fatal("could not create worker thread: %s\n", strerror(errno));
264 
265  thread_handles[parallel_jobs - 1] = child;
266 #endif
267  }
268 
269  return;
270 }
271 
272 
273 #ifdef WIN32
274 DWORD
275 win32_transfer_all_new_dbs(transfer_thread_arg *args)
276 {
277  transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
278  args->new_pgdata, args->old_tablespace);
279 
280  /* terminates thread */
281  return 0;
282 }
283 #endif
284 
285 
286 /*
287  * collect status from a completed worker child
288  */
289 bool
290 reap_child(bool wait_for_child)
291 {
292 #ifndef WIN32
293  int work_status;
294  int ret;
295 #else
296  int thread_num;
297  DWORD res;
298 #endif
299 
300  if (user_opts.jobs <= 1 || parallel_jobs == 0)
301  return false;
302 
303 #ifndef WIN32
304  ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
305 
306  /* no children or, for WNOHANG, no dead children */
307  if (ret <= 0 || !WIFEXITED(work_status))
308  return false;
309 
310  if (WEXITSTATUS(work_status) != 0)
311  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
312 #else
313  /* wait for one to finish */
314  thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
315  false, wait_for_child ? INFINITE : 0);
316 
317  if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
318  return false;
319 
320  /* compute thread index in active_threads */
321  thread_num -= WAIT_OBJECT_0;
322 
323  /* get the result */
324  GetExitCodeThread(thread_handles[thread_num], &res);
325  if (res != 0)
326  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
327 
328  /* dispose of handle to stop leaks */
329  CloseHandle(thread_handles[thread_num]);
330 
331  /* Move last slot into dead child's position */
332  if (thread_num != parallel_jobs - 1)
333  {
334  void *tmp_args;
335 
336  thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
337 
338  /*
339  * Move last active thead arg struct into the now-dead slot, and the
340  * now-dead slot to the end for reuse by the next thread. Though the
341  * thread struct is in use by another thread, we can safely swap the
342  * struct pointers within the array.
343  */
344  tmp_args = cur_thread_args[thread_num];
345  cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
346  cur_thread_args[parallel_jobs - 1] = tmp_args;
347  }
348 #endif
349 
350  /* do this after job has been removed */
351  parallel_jobs--;
352 
353  return true;
354 }
void parallel_exec_prog(const char *log_file, const char *opt_log_file, const char *fmt,...)
Definition: parallel.c:63
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenode.c:81
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static char * log_file
Definition: pg_ctl.c:83
#define WIFEXITED(w)
Definition: win32.h:180
void pg_fatal(const char *fmt,...)
Definition: logging.c:83
int jobs
Definition: pg_upgrade.h:298
int int vsnprintf(char *str, size_t count, const char *fmt, va_list args)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: parallel.c:178
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:29
#define MAX_STRING
Definition: pg_upgrade.h:21
#define WEXITSTATUS(w)
Definition: win32.h:182
#define NULL
Definition: c.h:229
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:290
bool exec_prog(const char *log_file, const char *opt_log_file, bool throw_error, const char *fmt,...)
Definition: exec.c:77
int i
const char * strerror(int errnum)
Definition: strerror.c:19
static int parallel_jobs
Definition: parallel.c:20