PostgreSQL Source Code  git master
parallel.c File Reference
#include "postgres_fe.h"
#include <sys/wait.h>
#include "pg_upgrade.h"
Include dependency graph for parallel.c:

Go to the source code of this file.

Functions

void parallel_exec_prog (const char *log_file, const char *opt_log_file, const char *fmt,...)
 
void parallel_transfer_all_new_dbs (DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
 
bool reap_child (bool wait_for_child)
 

Variables

static int parallel_jobs
 

Function Documentation

◆ parallel_exec_prog()

void parallel_exec_prog ( const char *  log_file,
const char *  opt_log_file,
const char *  fmt,
  ... 
)

Definition at line 62 of file parallel.c.

References generate_unaccent_rules::args, exec_prog(), i, UserOpts::jobs, MAX_STRING, parallel_jobs, pg_fatal, pg_free(), pg_malloc(), pg_malloc0(), pg_strdup(), reap_child(), strerror, user_opts, and vsnprintf.

Referenced by create_new_objects(), and generate_old_dump().

64 {
65  va_list args;
66  char cmd[MAX_STRING];
67 
68 #ifndef WIN32
69  pid_t child;
70 #else
71  HANDLE child;
72  exec_thread_arg *new_arg;
73 #endif
74 
75  va_start(args, fmt);
76  vsnprintf(cmd, sizeof(cmd), fmt, args);
77  va_end(args);
78 
79  if (user_opts.jobs <= 1)
80  /* exit_on_error must be true to allow jobs */
81  exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82  else
83  {
84  /* parallel */
85 #ifdef WIN32
86  if (thread_handles == NULL)
87  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88 
89  if (exec_thread_args == NULL)
90  {
91  int i;
92 
93  exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94 
95  /*
96  * For safety and performance, we keep the args allocated during
97  * the entire life of the process, and we don't free the args in a
98  * thread different from the one that allocated it.
99  */
100  for (i = 0; i < user_opts.jobs; i++)
101  exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102  }
103 
104  cur_thread_args = (void **) exec_thread_args;
105 #endif
106  /* harvest any dead children */
107  while (reap_child(false) == true)
108  ;
109 
110  /* must we wait for a dead child? */
112  reap_child(true);
113 
114  /* set this before we start the job */
115  parallel_jobs++;
116 
117  /* Ensure stdio state is quiesced before forking */
118  fflush(NULL);
119 
120 #ifndef WIN32
121  child = fork();
122  if (child == 0)
123  /* use _exit to skip atexit() functions */
124  _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
125  else if (child < 0)
126  /* fork failed */
127  pg_fatal("could not create worker process: %s\n", strerror(errno));
128 #else
129  /* empty array element are always at the end */
130  new_arg = exec_thread_args[parallel_jobs - 1];
131 
132  /* Can only pass one pointer into the function, so use a struct */
133  if (new_arg->log_file)
134  pg_free(new_arg->log_file);
135  new_arg->log_file = pg_strdup(log_file);
136  if (new_arg->opt_log_file)
137  pg_free(new_arg->opt_log_file);
138  new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
139  if (new_arg->cmd)
140  pg_free(new_arg->cmd);
141  new_arg->cmd = pg_strdup(cmd);
142 
143  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
144  new_arg, 0, NULL);
145  if (child == 0)
146  pg_fatal("could not create worker thread: %s\n", strerror(errno));
147 
148  thread_handles[parallel_jobs - 1] = child;
149 #endif
150  }
151 
152  return;
153 }
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
static char * log_file
Definition: pg_ctl.c:91
#define pg_fatal(...)
Definition: pg_rewind.h:43
int jobs
Definition: pg_upgrade.h:295
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
#define vsnprintf
Definition: port.h:191
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:30
#define MAX_STRING
Definition: pg_upgrade.h:18
#define strerror
Definition: port.h:205
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:288
int i
bool exec_prog(const char *log_file, const char *opt_log_file, bool report_error, bool exit_on_error, const char *fmt,...)
Definition: exec.c:80
static int parallel_jobs
Definition: parallel.c:19

◆ parallel_transfer_all_new_dbs()

void parallel_transfer_all_new_dbs ( DbInfoArr old_db_arr,
DbInfoArr new_db_arr,
char *  old_pgdata,
char *  new_pgdata,
char *  old_tablespace 
)

Definition at line 177 of file parallel.c.

References generate_unaccent_rules::args, i, UserOpts::jobs, parallel_jobs, pg_fatal, pg_free(), pg_malloc(), pg_malloc0(), pg_strdup(), reap_child(), strerror, transfer_all_new_dbs(), and user_opts.

Referenced by transfer_all_new_tablespaces().

180 {
181 #ifndef WIN32
182  pid_t child;
183 #else
184  HANDLE child;
185  transfer_thread_arg *new_arg;
186 #endif
187 
188  if (user_opts.jobs <= 1)
189  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
190  else
191  {
192  /* parallel */
193 #ifdef WIN32
194  if (thread_handles == NULL)
195  thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
196 
197  if (transfer_thread_args == NULL)
198  {
199  int i;
200 
201  transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
202 
203  /*
204  * For safety and performance, we keep the args allocated during
205  * the entire life of the process, and we don't free the args in a
206  * thread different from the one that allocated it.
207  */
208  for (i = 0; i < user_opts.jobs; i++)
209  transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
210  }
211 
212  cur_thread_args = (void **) transfer_thread_args;
213 #endif
214  /* harvest any dead children */
215  while (reap_child(false) == true)
216  ;
217 
218  /* must we wait for a dead child? */
220  reap_child(true);
221 
222  /* set this before we start the job */
223  parallel_jobs++;
224 
225  /* Ensure stdio state is quiesced before forking */
226  fflush(NULL);
227 
228 #ifndef WIN32
229  child = fork();
230  if (child == 0)
231  {
232  transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
233  old_tablespace);
234  /* if we take another exit path, it will be non-zero */
235  /* use _exit to skip atexit() functions */
236  _exit(0);
237  }
238  else if (child < 0)
239  /* fork failed */
240  pg_fatal("could not create worker process: %s\n", strerror(errno));
241 #else
242  /* empty array element are always at the end */
243  new_arg = transfer_thread_args[parallel_jobs - 1];
244 
245  /* Can only pass one pointer into the function, so use a struct */
246  new_arg->old_db_arr = old_db_arr;
247  new_arg->new_db_arr = new_db_arr;
248  if (new_arg->old_pgdata)
249  pg_free(new_arg->old_pgdata);
250  new_arg->old_pgdata = pg_strdup(old_pgdata);
251  if (new_arg->new_pgdata)
252  pg_free(new_arg->new_pgdata);
253  new_arg->new_pgdata = pg_strdup(new_pgdata);
254  if (new_arg->old_tablespace)
255  pg_free(new_arg->old_tablespace);
256  new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
257 
258  child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
259  new_arg, 0, NULL);
260  if (child == 0)
261  pg_fatal("could not create worker thread: %s\n", strerror(errno));
262 
263  thread_handles[parallel_jobs - 1] = child;
264 #endif
265  }
266 
267  return;
268 }
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenode.c:88
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define pg_fatal(...)
Definition: pg_rewind.h:43
int jobs
Definition: pg_upgrade.h:295
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
UserOpts user_opts
Definition: option.c:30
#define strerror
Definition: port.h:205
void pg_free(void *ptr)
Definition: fe_memutils.c:105
bool reap_child(bool wait_for_child)
Definition: parallel.c:288
int i
static int parallel_jobs
Definition: parallel.c:19

◆ reap_child()

bool reap_child ( bool  wait_for_child)

Definition at line 288 of file parallel.c.

References UserOpts::jobs, parallel_jobs, pg_fatal, strerror, and user_opts.

Referenced by create_new_objects(), generate_old_dump(), parallel_exec_prog(), parallel_transfer_all_new_dbs(), and transfer_all_new_tablespaces().

289 {
290 #ifndef WIN32
291  int work_status;
292  pid_t child;
293 #else
294  int thread_num;
295  DWORD res;
296 #endif
297 
298  if (user_opts.jobs <= 1 || parallel_jobs == 0)
299  return false;
300 
301 #ifndef WIN32
302  child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
303  if (child == (pid_t) -1)
304  pg_fatal("waitpid() failed: %s\n", strerror(errno));
305  if (child == 0)
306  return false; /* no children, or no dead children */
307  if (work_status != 0)
308  pg_fatal("child process exited abnormally: status %d\n", work_status);
309 #else
310  /* wait for one to finish */
311  thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
312  false, wait_for_child ? INFINITE : 0);
313 
314  if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
315  return false;
316 
317  /* compute thread index in active_threads */
318  thread_num -= WAIT_OBJECT_0;
319 
320  /* get the result */
321  GetExitCodeThread(thread_handles[thread_num], &res);
322  if (res != 0)
323  pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
324 
325  /* dispose of handle to stop leaks */
326  CloseHandle(thread_handles[thread_num]);
327 
328  /* Move last slot into dead child's position */
329  if (thread_num != parallel_jobs - 1)
330  {
331  void *tmp_args;
332 
333  thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
334 
335  /*
336  * Move last active thread arg struct into the now-dead slot, and the
337  * now-dead slot to the end for reuse by the next thread. Though the
338  * thread struct is in use by another thread, we can safely swap the
339  * struct pointers within the array.
340  */
341  tmp_args = cur_thread_args[thread_num];
342  cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
343  cur_thread_args[parallel_jobs - 1] = tmp_args;
344  }
345 #endif
346 
347  /* do this after job has been removed */
348  parallel_jobs--;
349 
350  return true;
351 }
#define pg_fatal(...)
Definition: pg_rewind.h:43
int jobs
Definition: pg_upgrade.h:295
UserOpts user_opts
Definition: option.c:30
#define strerror
Definition: port.h:205
static int parallel_jobs
Definition: parallel.c:19

Variable Documentation

◆ parallel_jobs

int parallel_jobs
static

Definition at line 19 of file parallel.c.

Referenced by parallel_exec_prog(), parallel_transfer_all_new_dbs(), and reap_child().