PostgreSQL Source Code  git master
parallel.c
Go to the documentation of this file.
1 /*
2  * parallel.c
3  *
4  * multi-process support
5  *
6  * Copyright (c) 2010-2019, PostgreSQL Global Development Group
7  * src/bin/pg_upgrade/parallel.c
8  */
9 
10 #include "postgres_fe.h"
11 
12 #include <sys/wait.h>
13 #ifdef WIN32
14 #include <io.h>
15 #endif
16 
17 #include "pg_upgrade.h"
18 
19 static int parallel_jobs;
20 
21 #ifdef WIN32
22 /*
23  * Array holding all active threads. There can't be any gaps/zeros so
24  * it can be passed to WaitForMultipleObjects(). We use two arrays
25  * so the thread_handles array can be passed to WaitForMultipleObjects().
26  */
27 HANDLE *thread_handles;
28 
29 typedef struct
30 {
31  char *log_file;
32  char *opt_log_file;
33  char *cmd;
34 } exec_thread_arg;
35 
36 typedef struct
37 {
38  DbInfoArr *old_db_arr;
39  DbInfoArr *new_db_arr;
40  char *old_pgdata;
41  char *new_pgdata;
42  char *old_tablespace;
43 } transfer_thread_arg;
44 
45 exec_thread_arg **exec_thread_args;
46 transfer_thread_arg **transfer_thread_args;
47 
48 /* track current thread_args struct so reap_child() can be used for all cases */
49 void **cur_thread_args;
50 
51 DWORD win32_exec_prog(exec_thread_arg *args);
52 DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
53 #endif
54 
55 /*
56  * parallel_exec_prog
57  *
58  * This has the same API as exec_prog, except it does parallel execution,
59  * and therefore must throw errors and doesn't return an error status.
60  */
61 void
62 parallel_exec_prog(const char *log_file, const char *opt_log_file,
63  const char *fmt,...)
64 {
65  va_list args;
66  char cmd[MAX_STRING];
67 
68 #ifndef WIN32
69  pid_t child;
70 #else
71  HANDLE child;
72  exec_thread_arg *new_arg;
73 #endif
74 
75  va_start(args, fmt);
76  vsnprintf(cmd, sizeof(cmd), fmt, args);
77  va_end(args);
78 
79  if (user_opts.jobs <= 1)
80  /* exit_on_error must be true to allow jobs */
81  exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82  else
83  {
84  /* parallel */
85 #ifdef WIN32
86  if (thread_handles == NULL)
87  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88 
89  if (exec_thread_args == NULL)
90  {
91  int i;
92 
93  exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94 
95  /*
96  * For safety and performance, we keep the args allocated during
97  * the entire life of the process, and we don't free the args in a
98  * thread different from the one that allocated it.
99  */
100  for (i = 0; i < user_opts.jobs; i++)
101  exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102  }
103 
104  cur_thread_args = (void **) exec_thread_args;
105 #endif
106  /* harvest any dead children */
107  while (reap_child(false) == true)
108  ;
109 
110  /* must we wait for a dead child? */
112  reap_child(true);
113 
114  /* set this before we start the job */
115  parallel_jobs++;
116 
117  /* Ensure stdio state is quiesced before forking */
118  fflush(NULL);
119 
120 #ifndef WIN32
121  child = fork();
122  if (child == 0)
123  /* use _exit to skip atexit() functions */
124  _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
125  else if (child < 0)
126  /* fork failed */
127  pg_fatal("could not create worker process: %s\n", strerror(errno));
128 #else
129  /* empty array element are always at the end */
130  new_arg = exec_thread_args[parallel_jobs - 1];
131 
132  /* Can only pass one pointer into the function, so use a struct */
133  if (new_arg->log_file)
134  pg_free(new_arg->log_file);
135  new_arg->log_file = pg_strdup(log_file);
136  if (new_arg->opt_log_file)
137  pg_free(new_arg->opt_log_file);
138  new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
139  if (new_arg->cmd)
140  pg_free(new_arg->cmd);
141  new_arg->cmd = pg_strdup(cmd);
142 
143  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
144  new_arg, 0, NULL);
145  if (child == 0)
146  pg_fatal("could not create worker thread: %s\n", strerror(errno));
147 
148  thread_handles[parallel_jobs - 1] = child;
149 #endif
150  }
151 }
152 
153 
154 #ifdef WIN32
155 DWORD
156 win32_exec_prog(exec_thread_arg *args)
157 {
158  int ret;
159 
160  ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
161 
162  /* terminates thread */
163  return ret;
164 }
165 #endif
166 
167 
168 /*
169  * parallel_transfer_all_new_dbs
170  *
171  * This has the same API as transfer_all_new_dbs, except it does parallel execution
172  * by transferring multiple tablespaces in parallel
173  */
174 void
176  char *old_pgdata, char *new_pgdata,
177  char *old_tablespace)
178 {
179 #ifndef WIN32
180  pid_t child;
181 #else
182  HANDLE child;
183  transfer_thread_arg *new_arg;
184 #endif
185 
186  if (user_opts.jobs <= 1)
187  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
188  else
189  {
190  /* parallel */
191 #ifdef WIN32
192  if (thread_handles == NULL)
193  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
194 
195  if (transfer_thread_args == NULL)
196  {
197  int i;
198 
199  transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
200 
201  /*
202  * For safety and performance, we keep the args allocated during
203  * the entire life of the process, and we don't free the args in a
204  * thread different from the one that allocated it.
205  */
206  for (i = 0; i < user_opts.jobs; i++)
207  transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
208  }
209 
210  cur_thread_args = (void **) transfer_thread_args;
211 #endif
212  /* harvest any dead children */
213  while (reap_child(false) == true)
214  ;
215 
216  /* must we wait for a dead child? */
218  reap_child(true);
219 
220  /* set this before we start the job */
221  parallel_jobs++;
222 
223  /* Ensure stdio state is quiesced before forking */
224  fflush(NULL);
225 
226 #ifndef WIN32
227  child = fork();
228  if (child == 0)
229  {
230  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
231  old_tablespace);
232  /* if we take another exit path, it will be non-zero */
233  /* use _exit to skip atexit() functions */
234  _exit(0);
235  }
236  else if (child < 0)
237  /* fork failed */
238  pg_fatal("could not create worker process: %s\n", strerror(errno));
239 #else
240  /* empty array element are always at the end */
241  new_arg = transfer_thread_args[parallel_jobs - 1];
242 
243  /* Can only pass one pointer into the function, so use a struct */
244  new_arg->old_db_arr = old_db_arr;
245  new_arg->new_db_arr = new_db_arr;
246  if (new_arg->old_pgdata)
247  pg_free(new_arg->old_pgdata);
248  new_arg->old_pgdata = pg_strdup(old_pgdata);
249  if (new_arg->new_pgdata)
250  pg_free(new_arg->new_pgdata);
251  new_arg->new_pgdata = pg_strdup(new_pgdata);
252  if (new_arg->old_tablespace)
253  pg_free(new_arg->old_tablespace);
254  new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
255 
256  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
257  new_arg, 0, NULL);
258  if (child == 0)
259  pg_fatal("could not create worker thread: %s\n", strerror(errno));
260 
261  thread_handles[parallel_jobs - 1] = child;
262 #endif
263  }
264 }
265 
266 
267 #ifdef WIN32
268 DWORD
269 win32_transfer_all_new_dbs(transfer_thread_arg *args)
270 {
271  transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
272  args->new_pgdata, args->old_tablespace);
273 
274  /* terminates thread */
275  return 0;
276 }
277 #endif
278 
279 
280 /*
281  * collect status from a completed worker child
282  */
283 bool
284 reap_child(bool wait_for_child)
285 {
286 #ifndef WIN32
287  int work_status;
288  pid_t child;
289 #else
290  int thread_num;
291  DWORD res;
292 #endif
293 
294  if (user_opts.jobs <= 1 || parallel_jobs == 0)
295  return false;
296 
297 #ifndef WIN32
298  child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
299  if (child == (pid_t) -1)
300  pg_fatal("waitpid() failed: %s\n", strerror(errno));
301  if (child == 0)
302  return false; /* no children, or no dead children */
303  if (work_status != 0)
304  pg_fatal("child process exited abnormally: status %d\n", work_status);
305 #else
306  /* wait for one to finish */
307  thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
308  false, wait_for_child ? INFINITE : 0);
309 
310  if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
311  return false;
312 
313  /* compute thread index in active_threads */
314  thread_num -= WAIT_OBJECT_0;
315 
316  /* get the result */
317  GetExitCodeThread(thread_handles[thread_num], &res);
318  if (res != 0)
319  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
320 
321  /* dispose of handle to stop leaks */
322  CloseHandle(thread_handles[thread_num]);
323 
324  /* Move last slot into dead child's position */
325  if (thread_num != parallel_jobs - 1)
326  {
327  void *tmp_args;
328 
329  thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
330 
331  /*
332  * Move last active thread arg struct into the now-dead slot, and the
333  * now-dead slot to the end for reuse by the next thread. Though the
334  * thread struct is in use by another thread, we can safely swap the
335  * struct pointers within the array.
336  */
337  tmp_args = cur_thread_args[thread_num];
338  cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
339  cur_thread_args[parallel_jobs - 1] = tmp_args;
340  }
341 #endif
342 
343  /* do this after job has been removed */
344  parallel_jobs--;
345 
346  return true;
347 }
void parallel_exec_prog(const char *log_file, const char *opt_log_file, const char *fmt,...)
Definition: parallel.c:62
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenode.c:86
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static char * log_file
Definition: pg_ctl.c:91
#define pg_fatal(...)
Definition: pg_rewind.h:41
int jobs
Definition: pg_upgrade.h:295
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
#define vsnprintf
Definition: port.h:191
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: parallel.c:175
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:30
#define MAX_STRING
Definition: pg_upgrade.h:18
#define strerror
Definition: port.h:205
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:284
int i
bool exec_prog(const char *log_file, const char *opt_log_file, bool report_error, bool exit_on_error, const char *fmt,...)
Definition: exec.c:80
static int parallel_jobs
Definition: parallel.c:19