PostgreSQL Source Code git master
parallel.c
Go to the documentation of this file.
1/*
2 * parallel.c
3 *
4 * multi-process support
5 *
6 * Copyright (c) 2010-2025, PostgreSQL Global Development Group
7 * src/bin/pg_upgrade/parallel.c
8 */
9
10#include "postgres_fe.h"
11
12#include <sys/wait.h>
13#ifdef WIN32
14#include <io.h>
15#endif
16
17#include "pg_upgrade.h"
18
19static int parallel_jobs;
20
21#ifdef WIN32
22/*
23 * Array holding all active threads. There can't be any gaps/zeros so
24 * it can be passed to WaitForMultipleObjects(). We use two arrays
25 * so the thread_handles array can be passed to WaitForMultipleObjects().
26 */
27static HANDLE *thread_handles;
28
29typedef struct
30{
31 char *log_file;
32 char *opt_log_file;
33 char *cmd;
34} exec_thread_arg;
35
36typedef struct
37{
38 DbInfoArr *old_db_arr;
39 DbInfoArr *new_db_arr;
40 char *old_pgdata;
41 char *new_pgdata;
42 char *old_tablespace;
43} transfer_thread_arg;
44
45static exec_thread_arg **exec_thread_args;
46static transfer_thread_arg **transfer_thread_args;
47
48/* track current thread_args struct so reap_child() can be used for all cases */
49static void **cur_thread_args;
50
51DWORD win32_exec_prog(exec_thread_arg *args);
52DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
53#endif
54
55/*
56 * parallel_exec_prog
57 *
58 * This has the same API as exec_prog, except it does parallel execution,
59 * and therefore must throw errors and doesn't return an error status.
60 */
61void
62parallel_exec_prog(const char *log_file, const char *opt_log_file,
63 const char *fmt,...)
64{
65 va_list args;
66 char cmd[MAX_STRING];
67
68#ifndef WIN32
69 pid_t child;
70#else
71 HANDLE child;
72 exec_thread_arg *new_arg;
73#endif
74
76 vsnprintf(cmd, sizeof(cmd), fmt, args);
77 va_end(args);
78
79 if (user_opts.jobs <= 1)
80 /* exit_on_error must be true to allow jobs */
81 exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82 else
83 {
84 /* parallel */
85#ifdef WIN32
86 if (thread_handles == NULL)
87 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88
89 if (exec_thread_args == NULL)
90 {
91 int i;
92
93 exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94
95 /*
96 * For safety and performance, we keep the args allocated during
97 * the entire life of the process, and we don't free the args in a
98 * thread different from the one that allocated it.
99 */
100 for (i = 0; i < user_opts.jobs; i++)
101 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102 }
103
104 cur_thread_args = (void **) exec_thread_args;
105#endif
106 /* harvest any dead children */
107 while (reap_child(false) == true)
108 ;
109
110 /* must we wait for a dead child? */
112 reap_child(true);
113
114 /* set this before we start the job */
116
117 /* Ensure stdio state is quiesced before forking */
118 fflush(NULL);
119
120#ifndef WIN32
121 child = fork();
122 if (child == 0)
123 /* use _exit to skip atexit() functions */
124 _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
125 else if (child < 0)
126 /* fork failed */
127 pg_fatal("could not create worker process: %m");
128#else
129 /* empty array element are always at the end */
130 new_arg = exec_thread_args[parallel_jobs - 1];
131
132 /* Can only pass one pointer into the function, so use a struct */
133 pg_free(new_arg->log_file);
134 new_arg->log_file = pg_strdup(log_file);
135 pg_free(new_arg->opt_log_file);
136 new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
137 pg_free(new_arg->cmd);
138 new_arg->cmd = pg_strdup(cmd);
139
140 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
141 new_arg, 0, NULL);
142 if (child == 0)
143 pg_fatal("could not create worker thread: %m");
144
145 thread_handles[parallel_jobs - 1] = child;
146#endif
147 }
148}
149
150
151#ifdef WIN32
152DWORD
153win32_exec_prog(exec_thread_arg *args)
154{
155 int ret;
156
157 ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
158
159 /* terminates thread */
160 return ret;
161}
162#endif
163
164
165/*
166 * parallel_transfer_all_new_dbs
167 *
168 * This has the same API as transfer_all_new_dbs, except it does parallel execution
169 * by transferring multiple tablespaces in parallel
170 */
171void
173 char *old_pgdata, char *new_pgdata,
174 char *old_tablespace)
175{
176#ifndef WIN32
177 pid_t child;
178#else
179 HANDLE child;
180 transfer_thread_arg *new_arg;
181#endif
182
183 if (user_opts.jobs <= 1)
184 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
185 else
186 {
187 /* parallel */
188#ifdef WIN32
189 if (thread_handles == NULL)
190 thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
191
192 if (transfer_thread_args == NULL)
193 {
194 int i;
195
196 transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
197
198 /*
199 * For safety and performance, we keep the args allocated during
200 * the entire life of the process, and we don't free the args in a
201 * thread different from the one that allocated it.
202 */
203 for (i = 0; i < user_opts.jobs; i++)
204 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
205 }
206
207 cur_thread_args = (void **) transfer_thread_args;
208#endif
209 /* harvest any dead children */
210 while (reap_child(false) == true)
211 ;
212
213 /* must we wait for a dead child? */
215 reap_child(true);
216
217 /* set this before we start the job */
219
220 /* Ensure stdio state is quiesced before forking */
221 fflush(NULL);
222
223#ifndef WIN32
224 child = fork();
225 if (child == 0)
226 {
227 transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
228 old_tablespace);
229 /* if we take another exit path, it will be non-zero */
230 /* use _exit to skip atexit() functions */
231 _exit(0);
232 }
233 else if (child < 0)
234 /* fork failed */
235 pg_fatal("could not create worker process: %m");
236#else
237 /* empty array element are always at the end */
238 new_arg = transfer_thread_args[parallel_jobs - 1];
239
240 /* Can only pass one pointer into the function, so use a struct */
241 new_arg->old_db_arr = old_db_arr;
242 new_arg->new_db_arr = new_db_arr;
243 pg_free(new_arg->old_pgdata);
244 new_arg->old_pgdata = pg_strdup(old_pgdata);
245 pg_free(new_arg->new_pgdata);
246 new_arg->new_pgdata = pg_strdup(new_pgdata);
247 pg_free(new_arg->old_tablespace);
248 new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
249
250 child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
251 new_arg, 0, NULL);
252 if (child == 0)
253 pg_fatal("could not create worker thread: %m");
254
255 thread_handles[parallel_jobs - 1] = child;
256#endif
257 }
258}
259
260
261#ifdef WIN32
262DWORD
263win32_transfer_all_new_dbs(transfer_thread_arg *args)
264{
265 transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
266 args->new_pgdata, args->old_tablespace);
267
268 /* terminates thread */
269 return 0;
270}
271#endif
272
273
274/*
275 * collect status from a completed worker child
276 */
277bool
278reap_child(bool wait_for_child)
279{
280#ifndef WIN32
281 int work_status;
282 pid_t child;
283#else
284 int thread_num;
285 DWORD res;
286#endif
287
288 if (user_opts.jobs <= 1 || parallel_jobs == 0)
289 return false;
290
291#ifndef WIN32
292 child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
293 if (child == (pid_t) -1)
294 pg_fatal("%s() failed: %m", "waitpid");
295 if (child == 0)
296 return false; /* no children, or no dead children */
297 if (work_status != 0)
298 pg_fatal("child process exited abnormally: status %d", work_status);
299#else
300 /* wait for one to finish */
301 thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
302 false, wait_for_child ? INFINITE : 0);
303
304 if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
305 return false;
306
307 /* compute thread index in active_threads */
308 thread_num -= WAIT_OBJECT_0;
309
310 /* get the result */
311 GetExitCodeThread(thread_handles[thread_num], &res);
312 if (res != 0)
313 pg_fatal("child worker exited abnormally: %m");
314
315 /* dispose of handle to stop leaks */
316 CloseHandle(thread_handles[thread_num]);
317
318 /* Move last slot into dead child's position */
319 if (thread_num != parallel_jobs - 1)
320 {
321 void *tmp_args;
322
323 thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
324
325 /*
326 * Move last active thread arg struct into the now-dead slot, and the
327 * now-dead slot to the end for reuse by the next thread. Though the
328 * thread struct is in use by another thread, we can safely swap the
329 * struct pointers within the array.
330 */
331 tmp_args = cur_thread_args[thread_num];
332 cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
333 cur_thread_args[parallel_jobs - 1] = tmp_args;
334 }
335#endif
336
337 /* do this after job has been removed */
339
340 return true;
341}
bool exec_prog(const char *log_filename, const char *opt_log_file, bool report_error, bool exit_on_error, const char *fmt,...)
Definition: exec.c:85
static int parallel_jobs
Definition: parallel.c:19
void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: parallel.c:172
bool reap_child(bool wait_for_child)
Definition: parallel.c:278
void parallel_exec_prog(const char *log_file, const char *opt_log_file, const char *fmt,...)
Definition: parallel.c:62
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
int i
Definition: isn.c:72
static void const char * fmt
static void const char fflush(stdout)
va_end(args)
va_start(args, fmt)
#define pg_fatal(...)
static char * log_file
Definition: pg_ctl.c:87
void transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace)
Definition: relfilenumber.c:87
#define MAX_STRING
Definition: pg_upgrade.h:22
#define vsnprintf
Definition: port.h:237
UserOpts user_opts
Definition: option.c:30
int jobs
Definition: pg_upgrade.h:327