PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
parallel.c File Reference
#include "postgres_fe.h"
#include <sys/wait.h>
#include "pg_upgrade.h"
Include dependency graph for parallel.c:

Go to the source code of this file.

Functions

void parallel_exec_prog (const char *log_file, const char *opt_log_file, const char *fmt,...)
 
void parallel_transfer_all_new_dbs (DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
 
bool reap_child (bool wait_for_child)
 

Variables

static int parallel_jobs
 

Function Documentation

void parallel_exec_prog ( const char *  log_file,
const char *  opt_log_file,
const char *  fmt,
  ... 
)

Definition at line 63 of file parallel.c.

References generate_unaccent_rules::args, exec_prog(), i, UserOpts::jobs, MAX_STRING, NULL, parallel_jobs, pg_fatal(), pg_free(), pg_malloc(), pg_malloc0(), pg_strdup(), reap_child(), strerror(), user_opts, and vsnprintf().

Referenced by create_new_objects(), and generate_old_dump().

65 {
66  va_list args;
67  char cmd[MAX_STRING];
68 
69 #ifndef WIN32
70  pid_t child;
71 #else
72  HANDLE child;
73  exec_thread_arg *new_arg;
74 #endif
75 
76  va_start(args, fmt);
77  vsnprintf(cmd, sizeof(cmd), fmt, args);
78  va_end(args);
79 
80  if (user_opts.jobs <= 1)
81  /* throw_error must be true to allow jobs */
82  exec_prog(log_file, opt_log_file, true, "%s", cmd);
83  else
84  {
85  /* parallel */
86 #ifdef WIN32
87  if (thread_handles == NULL)
88  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
89 
90  if (exec_thread_args == NULL)
91  {
92  int i;
93 
94  exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
95 
96  /*
97  * For safety and performance, we keep the args allocated during
98  * the entire life of the process, and we don't free the args in a
99  * thread different from the one that allocated it.
100  */
101  for (i = 0; i < user_opts.jobs; i++)
102  exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
103  }
104 
105  cur_thread_args = (void **) exec_thread_args;
106 #endif
107  /* harvest any dead children */
108  while (reap_child(false) == true)
109  ;
110 
111  /* must we wait for a dead child? */
113  reap_child(true);
114 
115  /* set this before we start the job */
116  parallel_jobs++;
117 
118  /* Ensure stdio state is quiesced before forking */
119  fflush(NULL);
120 
121 #ifndef WIN32
122  child = fork();
123  if (child == 0)
124  /* use _exit to skip atexit() functions */
125  _exit(!exec_prog(log_file, opt_log_file, true, "%s", cmd));
126  else if (child < 0)
127  /* fork failed */
128  pg_fatal("could not create worker process: %s\n", strerror(errno));
129 #else
130  /* empty array element are always at the end */
131  new_arg = exec_thread_args[parallel_jobs - 1];
132 
133  /* Can only pass one pointer into the function, so use a struct */
134  if (new_arg->log_file)
135  pg_free(new_arg->log_file);
136  new_arg->log_file = pg_strdup(log_file);
137  if (new_arg->opt_log_file)
138  pg_free(new_arg->opt_log_file);
139  new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
140  if (new_arg->cmd)
141  pg_free(new_arg->cmd);
142  new_arg->cmd = pg_strdup(cmd);
143 
144  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
145  new_arg, 0, NULL);
146  if (child == 0)
147  pg_fatal("could not create worker thread: %s\n", strerror(errno));
148 
149  thread_handles[parallel_jobs - 1] = child;
150 #endif
151  }
152 
153  return;
154 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static char * log_file
Definition: pg_ctl.c:83
void pg_fatal(const char *fmt,...)
Definition: logging.c:83
int jobs
Definition: pg_upgrade.h:298
int int vsnprintf(char *str, size_t count, const char *fmt, va_list args)
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:29
#define MAX_STRING
Definition: pg_upgrade.h:21
#define NULL
Definition: c.h:229
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:290
bool exec_prog(const char *log_file, const char *opt_log_file, bool throw_error, const char *fmt,...)
Definition: exec.c:77
int i
const char * strerror(int errnum)
Definition: strerror.c:19
static int parallel_jobs
Definition: parallel.c:20
void parallel_transfer_all_new_dbs ( DbInfoArr old_db_arr,
DbInfoArr new_db_arr,
char *  old_pgdata,
char *  new_pgdata,
char *  old_tablespace 
)

Definition at line 178 of file parallel.c.

References i, UserOpts::jobs, NULL, parallel_jobs, pg_fatal(), pg_free(), pg_malloc(), pg_malloc0(), pg_strdup(), reap_child(), strerror(), transfer_all_new_dbs(), and user_opts.

Referenced by transfer_all_new_tablespaces().

181 {
182 #ifndef WIN32
183  pid_t child;
184 #else
185  HANDLE child;
186  transfer_thread_arg *new_arg;
187 #endif
188 
189  if (user_opts.jobs <= 1)
190  /* throw_error must be true to allow jobs */
191  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
192  else
193  {
194  /* parallel */
195 #ifdef WIN32
196  if (thread_handles == NULL)
197  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
198 
199  if (transfer_thread_args == NULL)
200  {
201  int i;
202 
203  transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
204 
205  /*
206  * For safety and performance, we keep the args allocated during
207  * the entire life of the process, and we don't free the args in a
208  * thread different from the one that allocated it.
209  */
210  for (i = 0; i < user_opts.jobs; i++)
211  transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
212  }
213 
214  cur_thread_args = (void **) transfer_thread_args;
215 #endif
216  /* harvest any dead children */
217  while (reap_child(false) == true)
218  ;
219 
220  /* must we wait for a dead child? */
222  reap_child(true);
223 
224  /* set this before we start the job */
225  parallel_jobs++;
226 
227  /* Ensure stdio state is quiesced before forking */
228  fflush(NULL);
229 
230 #ifndef WIN32
231  child = fork();
232  if (child == 0)
233  {
234  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
235  old_tablespace);
236  /* if we take another exit path, it will be non-zero */
237  /* use _exit to skip atexit() functions */
238  _exit(0);
239  }
240  else if (child < 0)
241  /* fork failed */
242  pg_fatal("could not create worker process: %s\n", strerror(errno));
243 #else
244  /* empty array element are always at the end */
245  new_arg = transfer_thread_args[parallel_jobs - 1];
246 
247  /* Can only pass one pointer into the function, so use a struct */
248  new_arg->old_db_arr = old_db_arr;
249  new_arg->new_db_arr = new_db_arr;
250  if (new_arg->old_pgdata)
251  pg_free(new_arg->old_pgdata);
252  new_arg->old_pgdata = pg_strdup(old_pgdata);
253  if (new_arg->new_pgdata)
254  pg_free(new_arg->new_pgdata);
255  new_arg->new_pgdata = pg_strdup(new_pgdata);
256  if (new_arg->old_tablespace)
257  pg_free(new_arg->old_tablespace);
258  new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
259 
260  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
261  new_arg, 0, NULL);
262  if (child == 0)
263  pg_fatal("could not create worker thread: %s\n", strerror(errno));
264 
265  thread_handles[parallel_jobs - 1] = child;
266 #endif
267  }
268 
269  return;
270 }
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenode.c:81
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
void pg_fatal(const char *fmt,...)
Definition: logging.c:83
int jobs
Definition: pg_upgrade.h:298
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:29
#define NULL
Definition: c.h:229
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:290
int i
const char * strerror(int errnum)
Definition: strerror.c:19
static int parallel_jobs
Definition: parallel.c:20
bool reap_child ( bool  wait_for_child)

Definition at line 290 of file parallel.c.

References UserOpts::jobs, parallel_jobs, pg_fatal(), strerror(), user_opts, WEXITSTATUS, and WIFEXITED.

Referenced by create_new_objects(), generate_old_dump(), parallel_exec_prog(), parallel_transfer_all_new_dbs(), and transfer_all_new_tablespaces().

291 {
292 #ifndef WIN32
293  int work_status;
294  int ret;
295 #else
296  int thread_num;
297  DWORD res;
298 #endif
299 
300  if (user_opts.jobs <= 1 || parallel_jobs == 0)
301  return false;
302 
303 #ifndef WIN32
304  ret = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
305 
306  /* no children or, for WNOHANG, no dead children */
307  if (ret <= 0 || !WIFEXITED(work_status))
308  return false;
309 
310  if (WEXITSTATUS(work_status) != 0)
311  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
312 #else
313  /* wait for one to finish */
314  thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
315  false, wait_for_child ? INFINITE : 0);
316 
317  if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
318  return false;
319 
320  /* compute thread index in active_threads */
321  thread_num -= WAIT_OBJECT_0;
322 
323  /* get the result */
324  GetExitCodeThread(thread_handles[thread_num], &res);
325  if (res != 0)
326  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
327 
328  /* dispose of handle to stop leaks */
329  CloseHandle(thread_handles[thread_num]);
330 
331  /* Move last slot into dead child's position */
332  if (thread_num != parallel_jobs - 1)
333  {
334  void *tmp_args;
335 
336  thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
337 
338  /*
339  * Move last active thead arg struct into the now-dead slot, and the
340  * now-dead slot to the end for reuse by the next thread. Though the
341  * thread struct is in use by another thread, we can safely swap the
342  * struct pointers within the array.
343  */
344  tmp_args = cur_thread_args[thread_num];
345  cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
346  cur_thread_args[parallel_jobs - 1] = tmp_args;
347  }
348 #endif
349 
350  /* do this after job has been removed */
351  parallel_jobs--;
352 
353  return true;
354 }
#define WIFEXITED(w)
Definition: win32.h:172
void pg_fatal(const char *fmt,...)
Definition: logging.c:83
int jobs
Definition: pg_upgrade.h:298
UserOpts user_opts
Definition: option.c:29
#define WEXITSTATUS(w)
Definition: win32.h:174
const char * strerror(int errnum)
Definition: strerror.c:19
static int parallel_jobs
Definition: parallel.c:20

Variable Documentation

int parallel_jobs
static

Definition at line 20 of file parallel.c.

Referenced by parallel_exec_prog(), parallel_transfer_all_new_dbs(), and reap_child().