PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
parallel.c
Go to the documentation of this file.
1 /*
2  * parallel.c
3  *
4  * multi-process support
5  *
6  * Copyright (c) 2010-2017, PostgreSQL Global Development Group
7  * src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include "pg_upgrade.h"
13 
14 #include <stdlib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/wait.h>
18 
19 #ifdef WIN32
20 #include <io.h>
21 #endif
22 
23 static int parallel_jobs;
24 
25 #ifdef WIN32
26 /*
27  * Array holding all active threads. There can't be any gaps/zeros so
28  * it can be passed to WaitForMultipleObjects(). We use two arrays
29  * so the thread_handles array can be passed to WaitForMultipleObjects().
30  */
31 HANDLE *thread_handles;
32 
33 typedef struct
34 {
35  char *log_file;
36  char *opt_log_file;
37  char *cmd;
38 } exec_thread_arg;
39 
40 typedef struct
41 {
42  DbInfoArr *old_db_arr;
43  DbInfoArr *new_db_arr;
44  char *old_pgdata;
45  char *new_pgdata;
46  char *old_tablespace;
47 } transfer_thread_arg;
48 
49 exec_thread_arg **exec_thread_args;
50 transfer_thread_arg **transfer_thread_args;
51 
52 /* track current thread_args struct so reap_child() can be used for all cases */
53 void **cur_thread_args;
54 
55 DWORD win32_exec_prog(exec_thread_arg *args);
56 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
57 #endif
58 
59 /*
60  * parallel_exec_prog
61  *
62  * This has the same API as exec_prog, except it does parallel execution,
63  * and therefore must throw errors and doesn't return an error status.
64  */
65 void
66 parallel_exec_prog(const char *log_file, const char *opt_log_file,
67  const char *fmt,...)
68 {
69  va_list args;
70  char cmd[MAX_STRING];
71 
72 #ifndef WIN32
73  pid_t child;
74 #else
75  HANDLE child;
76  exec_thread_arg *new_arg;
77 #endif
78 
79  va_start(args, fmt);
80  vsnprintf(cmd, sizeof(cmd), fmt, args);
81  va_end(args);
82 
83  if (user_opts.jobs <= 1)
84  /* throw_error must be true to allow jobs */
85  exec_prog(log_file, opt_log_file, true, "%s", cmd);
86  else
87  {
88  /* parallel */
89 #ifdef WIN32
90  if (thread_handles == NULL)
91  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
92 
93  if (exec_thread_args == NULL)
94  {
95  int i;
96 
97  exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
98 
99  /*
100  * For safety and performance, we keep the args allocated during
101  * the entire life of the process, and we don't free the args in a
102  * thread different from the one that allocated it.
103  */
104  for (i = 0; i < user_opts.jobs; i++)
105  exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
106  }
107 
108  cur_thread_args = (void **) exec_thread_args;
109 #endif
110  /* harvest any dead children */
111  while (reap_child(false) == true)
112  ;
113 
114  /* must we wait for a dead child? */
116  reap_child(true);
117 
118  /* set this before we start the job */
119  parallel_jobs++;
120 
121  /* Ensure stdio state is quiesced before forking */
122  fflush(NULL);
123 
124 #ifndef WIN32
125  child = fork();
126  if (child == 0)
127  /* use _exit to skip atexit() functions */
128  _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
129  else if (child < 0)
130  /* fork failed */
131  pg_fatal("could not create worker process: %s\n", strerror(errno));
132 #else
133  /* empty array element are always at the end */
134  new_arg = exec_thread_args[parallel_jobs - 1];
135 
136  /* Can only pass one pointer into the function, so use a struct */
137  if (new_arg->log_file)
138  pg_free(new_arg->log_file);
139  new_arg->log_file = pg_strdup(log_file);
140  if (new_arg->opt_log_file)
141  pg_free(new_arg->opt_log_file);
142  new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
143  if (new_arg->cmd)
144  pg_free(new_arg->cmd);
145  new_arg->cmd = pg_strdup(cmd);
146 
147  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
148  new_arg, 0, NULL);
149  if (child == 0)
150  pg_fatal("could not create worker thread: %s\n", strerror(errno));
151 
152  thread_handles[parallel_jobs - 1] = child;
153 #endif
154  }
155 
156  return;
157 }
158 
159 
160 #ifdef WIN32
161 DWORD
162 win32_exec_prog(exec_thread_arg *args)
163 {
164  int ret;
165 
166  ret = !exec_prog(args->log_file, args->opt_log_file, true, "%s", args->cmd);
167 
168  /* terminates thread */
169  return ret;
170 }
171 #endif
172 
173 
174 /*
175  * parallel_transfer_all_new_dbs
176  *
177  * This has the same API as transfer_all_new_dbs, except it does parallel execution
178  * by transferring multiple tablespaces in parallel
179  */
180 void
182  char *old_pgdata, char *new_pgdata,
183  char *old_tablespace)
184 {
185 #ifndef WIN32
186  pid_t child;
187 #else
188  HANDLE child;
189  transfer_thread_arg *new_arg;
190 #endif
191 
192  if (user_opts.jobs <= 1)
193  /* throw_error must be true to allow jobs */
194  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
195  else
196  {
197  /* parallel */
198 #ifdef WIN32
199  if (thread_handles == NULL)
200  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
201 
202  if (transfer_thread_args == NULL)
203  {
204  int i;
205 
206  transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
207 
208  /*
209  * For safety and performance, we keep the args allocated during
210  * the entire life of the process, and we don't free the args in a
211  * thread different from the one that allocated it.
212  */
213  for (i = 0; i < user_opts.jobs; i++)
214  transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
215  }
216 
217  cur_thread_args = (void **) transfer_thread_args;
218 #endif
219  /* harvest any dead children */
220  while (reap_child(false) == true)
221  ;
222 
223  /* must we wait for a dead child? */
225  reap_child(true);
226 
227  /* set this before we start the job */
228  parallel_jobs++;
229 
230  /* Ensure stdio state is quiesced before forking */
231  fflush(NULL);
232 
233 #ifndef WIN32
234  child = fork();
235  if (child == 0)
236  {
237  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
238  old_tablespace);
239  /* if we take another exit path, it will be non-zero */
240  /* use _exit to skip atexit() functions */
241  _exit(0);
242  }
243  else if (child < 0)
244  /* fork failed */
245  pg_fatal("could not create worker process: %s\n", strerror(errno));
246 #else
247  /* empty array element are always at the end */
248  new_arg = transfer_thread_args[parallel_jobs - 1];
249 
250  /* Can only pass one pointer into the function, so use a struct */
251  new_arg->old_db_arr = old_db_arr;
252  new_arg->new_db_arr = new_db_arr;
253  if (new_arg->old_pgdata)
254  pg_free(new_arg->old_pgdata);
255  new_arg->old_pgdata = pg_strdup(old_pgdata);
256  if (new_arg->new_pgdata)
257  pg_free(new_arg->new_pgdata);
258  new_arg->new_pgdata = pg_strdup(new_pgdata);
259  if (new_arg->old_tablespace)
260  pg_free(new_arg->old_tablespace);
261  new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
262 
263  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
264  new_arg, 0, NULL);
265  if (child == 0)
266  pg_fatal("could not create worker thread: %s\n", strerror(errno));
267 
268  thread_handles[parallel_jobs - 1] = child;
269 #endif
270  }
271 
272  return;
273 }
274 
275 
276 #ifdef WIN32
277 DWORD
278 win32_transfer_all_new_dbs(transfer_thread_arg *args)
279 {
280  transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
281  args->new_pgdata, args->old_tablespace);
282 
283  /* terminates thread */
284  return 0;
285 }
286 #endif
287 
288 
289 /*
290  * collect status from a completed worker child
291  */
292 bool
293 reap_child(bool wait_for_child)
294 {
295 #ifndef WIN32
296  int work_status;
297  int ret;
298 #else
299  int thread_num;
300  DWORD res;
301 #endif
302 
303  if (user_opts.jobs <= 1 || parallel_jobs == 0)
304  return false;
305 
306 #ifndef WIN32
307  ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
308 
309  /* no children or, for WNOHANG, no dead children */
310  if (ret <= 0 || !WIFEXITED(work_status))
311  return false;
312 
313  if (WEXITSTATUS(work_status) != 0)
314  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
315 #else
316  /* wait for one to finish */
317  thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
318  false, wait_for_child ? INFINITE : 0);
319 
320  if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
321  return false;
322 
323  /* compute thread index in active_threads */
324  thread_num -= WAIT_OBJECT_0;
325 
326  /* get the result */
327  GetExitCodeThread(thread_handles[thread_num], &res);
328  if (res != 0)
329  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
330 
331  /* dispose of handle to stop leaks */
332  CloseHandle(thread_handles[thread_num]);
333 
334  /* Move last slot into dead child's position */
335  if (thread_num != parallel_jobs - 1)
336  {
337  void *tmp_args;
338 
339  thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
340 
341  /*
342  * Move last active thead arg struct into the now-dead slot, and the
343  * now-dead slot to the end for reuse by the next thread. Though the
344  * thread struct is in use by another thread, we can safely swap the
345  * struct pointers within the array.
346  */
347  tmp_args = cur_thread_args[thread_num];
348  cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
349  cur_thread_args[parallel_jobs - 1] = tmp_args;
350  }
351 #endif
352 
353  /* do this after job has been removed */
354  parallel_jobs--;
355 
356  return true;
357 }
void parallel_exec_prog(const char *log_file, const char *opt_log_file, const char *fmt,...)
Definition: parallel.c:66
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenode.c:81
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static char * log_file
Definition: pg_ctl.c:86
#define WIFEXITED(w)
Definition: win32.h:180
void pg_fatal(const char *fmt,...)
Definition: logging.c:83
int jobs
Definition: pg_upgrade.h:300
int int vsnprintf(char *str, size_t count, const char *fmt, va_list args)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: parallel.c:181
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:30
#define MAX_STRING
Definition: pg_upgrade.h:21
#define WEXITSTATUS(w)
Definition: win32.h:182
#define NULL
Definition: c.h:226
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:293
bool exec_prog(const char *log_file, const char *opt_log_file, bool throw_error, const char *fmt,...)
Definition: exec.c:78
int i
const char * strerror(int errnum)
Definition: strerror.c:19
static int parallel_jobs
Definition: parallel.c:23