PostgreSQL Source Code git master
Loading...
Searching...
No Matches
task.c
Go to the documentation of this file.
1/*
2 * task.c
3 * framework for parallelizing pg_upgrade's once-in-each-database tasks
4 *
5 * This framework provides an efficient way of running the various
6 * once-in-each-database tasks required by pg_upgrade. Specifically, it
7 * parallelizes these tasks by managing a set of slots that follow a simple
8 * state machine and by using libpq's asynchronous APIs to establish the
9 * connections and run the queries. Callers simply need to create a callback
10 * function and build/execute an UpgradeTask. A simple example follows:
11 *
12 * static void
13 * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
14 * {
15 * for (int i = 0; i < PQntuples(res); i++)
16 * {
17 * ... process results ...
18 * }
19 * }
20 *
21 * void
22 * my_task(ClusterInfo *cluster)
23 * {
24 * UpgradeTask *task = upgrade_task_create();
25 *
26 * upgrade_task_add_step(task,
27 * "... query text ...",
28 * my_process_cb,
29 * true, // let the task free the PGresult
30 * NULL); // "arg" pointer for callback
31 * upgrade_task_run(task, cluster);
32 * upgrade_task_free(task);
33 * }
34 *
35 * Note that multiple steps can be added to a given task. When there are
36 * multiple steps, the task will run all of the steps consecutively in the same
37 * database connection before freeing the connection and moving on. In other
38 * words, it only ever initiates one connection to each database in the
39 * cluster for a given run.
40 *
41 * Copyright (c) 2024-2026, PostgreSQL Global Development Group
42 * src/bin/pg_upgrade/task.c
43 */
44
45#include "postgres_fe.h"
46
47#include "common/connect.h"
49#include "pg_upgrade.h"
50
51/*
52 * dbs_complete stores the number of databases that we have completed
53 * processing. When this value equals the number of databases in the cluster,
54 * the task is finished.
55 */
56static int dbs_complete;
57
58/*
59 * dbs_processing stores the index of the next database in the cluster's array
60 * of databases that will be picked up for processing. It will always be
61 * greater than or equal to dbs_complete.
62 */
63static int dbs_processing;
64
65/*
66 * This struct stores the information for a single step of a task. Note that
67 * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
68 * All steps in a task are run in a single connection before moving on to the
69 * next database (which requires a new connection).
70 */
71typedef struct UpgradeTaskStep
72{
73 UpgradeTaskProcessCB process_cb; /* processes the results of the query */
74 bool free_result; /* should we free the result? */
75 void *arg; /* pointer passed to process_cb */
77
78/*
79 * This struct is a thin wrapper around an array of steps, i.e.,
80 * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
81 */
88
89/*
90 * The different states for a parallel slot.
91 */
93{
94 FREE, /* slot available for use in a new database */
95 CONNECTING, /* waiting for connection to be established */
96 RUNNING_QUERIES, /* running/processing queries in the task */
98
99/*
100 * We maintain an array of user_opts.jobs slots to execute the task.
101 */
102typedef struct UpgradeTaskSlot
103{
104 UpgradeTaskSlotState state; /* state of the slot */
105 int db_idx; /* index of the database assigned to slot */
106 int step_idx; /* index of the current step of task */
107 PGconn *conn; /* current connection managed by slot */
108 bool ready; /* slot is ready for processing */
109 bool select_mode; /* select() mode: true->read, false->write */
110 int sock; /* file descriptor for connection's socket */
112
113/*
114 * Initializes an UpgradeTask.
115 */
118{
120
121 task->queries = createPQExpBuffer();
122
123 /* All tasks must first set a secure search_path. */
125
126 return task;
127}
128
129/*
130 * Frees all storage associated with an UpgradeTask.
131 */
132void
134{
135 destroyPQExpBuffer(task->queries);
136 pg_free(task->steps);
137 pg_free(task);
138}
139
140/*
141 * Adds a step to an UpgradeTask. The steps will be executed in each database
142 * in the order in which they are added.
143 *
144 * task: task object that must have been initialized via upgrade_task_create()
145 * query: the query text
146 * process_cb: function that processes the results of the query
147 * free_result: should we free the PGresult, or leave it to the caller?
148 * arg: pointer to task-specific data that is passed to each callback
149 */
150void
152 UpgradeTaskProcessCB process_cb, bool free_result,
153 void *arg)
154{
156
157 task->steps = pg_realloc_array(task->steps, UpgradeTaskStep,
158 ++task->num_steps);
159
160 new_step = &task->steps[task->num_steps - 1];
161 new_step->process_cb = process_cb;
162 new_step->free_result = free_result;
163 new_step->arg = arg;
164
165 appendPQExpBuffer(task->queries, "%s;", query);
166}
167
168/*
169 * Build a connection string for the slot's current database and asynchronously
170 * start a new connection, but do not wait for the connection to be
171 * established.
172 */
173static void
175{
177 DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
178
179 /* Build connection string with proper quoting */
181 appendPQExpBufferStr(&conn_opts, "dbname=");
185 appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
186 if (cluster->sockdir)
187 {
190 }
192 appendPQExpBufferStr(&conn_opts, " max_protocol_version=3.0");
193
194 slot->conn = PQconnectStart(conn_opts.data);
195
196 if (!slot->conn)
197 pg_fatal("out of memory");
198
200}
201
202/*
203 * Run the process_cb callback function to process the result of a query, and
204 * free the result if the caller indicated we should do so.
205 */
206static void
208 const UpgradeTask *task)
209{
210 UpgradeTaskStep *steps = &task->steps[slot->step_idx];
211 UpgradeTaskProcessCB process_cb = steps->process_cb;
212 DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
213 PGresult *res = PQgetResult(slot->conn);
214
215 if (PQstatus(slot->conn) == CONNECTION_BAD ||
218 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
219
220 /*
221 * We assume that a NULL process_cb callback function means there's
222 * nothing to process. This is primarily intended for the initial step in
223 * every task that sets a safe search_path.
224 */
225 if (process_cb)
226 (*process_cb) (dbinfo, res, steps->arg);
227
228 if (steps->free_result)
229 PQclear(res);
230}
231
232/*
233 * Advances the state machine for a given slot as necessary.
234 */
235static void
237{
239
240 if (!slot->ready)
241 return;
242
243 switch (slot->state)
244 {
245 case FREE:
246
247 /*
248 * If all of the databases in the cluster have been processed or
249 * are currently being processed by other slots, we are done.
250 */
251 if (dbs_processing >= cluster->dbarr.ndbs)
252 return;
253
254 /*
255 * Claim the next database in the cluster's array and initiate a
256 * new connection.
257 */
258 slot->db_idx = dbs_processing++;
259 slot->state = CONNECTING;
260 start_conn(cluster, slot);
261
262 return;
263
264 case CONNECTING:
265
266 /* Check for connection failure. */
267 status = PQconnectPoll(slot->conn);
268 if (status == PGRES_POLLING_FAILED)
269 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
270
271 /* Check whether the connection is still establishing. */
272 if (status != PGRES_POLLING_OK)
273 {
274 slot->select_mode = (status == PGRES_POLLING_READING);
275 return;
276 }
277
278 /*
279 * Move on to running/processing the queries in the task.
280 */
281 slot->state = RUNNING_QUERIES;
282 slot->select_mode = true; /* wait until ready for reading */
283 if (!PQsendQuery(slot->conn, task->queries->data))
284 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
285
286 return;
287
288 case RUNNING_QUERIES:
289
290 /*
291 * Consume any available data and clear the read-ready indicator
292 * for the connection.
293 */
294 if (!PQconsumeInput(slot->conn))
295 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
296
297 /*
298 * Process any results that are ready so that we can free up this
299 * slot for another database as soon as possible.
300 */
301 for (; slot->step_idx < task->num_steps; slot->step_idx++)
302 {
303 /* If no more results are available yet, move on. */
304 if (PQisBusy(slot->conn))
305 return;
306
308 }
309
310 /*
311 * If we just finished processing the result of the last step in
312 * the task, free the slot. We recursively call this function on
313 * the newly-freed slot so that we can start initiating the next
314 * connection immediately instead of waiting for the next loop
315 * through the slots.
316 */
317 dbs_complete++;
318 PQfinish(slot->conn);
319 memset(slot, 0, sizeof(UpgradeTaskSlot));
320 slot->ready = true;
321
322 process_slot(cluster, slot, task);
323
324 return;
325 }
326}
327
328/*
329 * Returns -1 on error, else the number of ready descriptors.
330 */
331static int
333{
336
337 if (maxFd == 0)
338 return 0;
339
340 for (;;)
341 {
342 int i;
343
344 *input = save_input;
346
347 i = select(maxFd + 1, input, output, NULL, NULL);
348
349#ifndef WIN32
350 if (i < 0 && errno == EINTR)
351 continue;
352#else
353 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
354 continue;
355#endif
356 return i;
357 }
358}
359
360/*
361 * Wait on the slots to either finish connecting or to receive query results if
362 * possible. This avoids a tight loop in upgrade_task_run().
363 */
364static void
365wait_on_slots(UpgradeTaskSlot *slots, int numslots)
366{
369 int maxFd = 0;
370
371 FD_ZERO(&input);
372 FD_ZERO(&output);
373
374 for (int i = 0; i < numslots; i++)
375 {
376 /*
377 * We assume the previous call to process_slot() handled everything
378 * that was marked ready in the previous call to wait_on_slots(), if
379 * any.
380 */
381 slots[i].ready = false;
382
383 /*
384 * This function should only ever see free slots as we are finishing
385 * processing the last few databases, at which point we don't have any
386 * databases left for them to process. We'll never use these slots
387 * again, so we can safely ignore them.
388 */
389 if (slots[i].state == FREE)
390 continue;
391
392 /*
393 * Add the socket to the set.
394 */
395 slots[i].sock = PQsocket(slots[i].conn);
396 if (slots[i].sock < 0)
397 pg_fatal("invalid socket");
398 FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
399 maxFd = Max(maxFd, slots[i].sock);
400 }
401
402 /*
403 * If we found socket(s) to wait on, wait.
404 */
405 if (select_loop(maxFd, &input, &output) == -1)
406 pg_fatal("%s() failed: %m", "select");
407
408 /*
409 * Mark which sockets appear to be ready.
410 */
411 for (int i = 0; i < numslots; i++)
412 slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
413 FD_ISSET(slots[i].sock, &output));
414}
415
416/*
417 * Runs all the steps of the task in every database in the cluster using
418 * user_opts.jobs parallel slots.
419 */
420void
422{
423 int jobs = Max(1, user_opts.jobs);
425
426 dbs_complete = 0;
427 dbs_processing = 0;
428
429 /*
430 * Process every slot the first time round.
431 */
432 for (int i = 0; i < jobs; i++)
433 slots[i].ready = true;
434
435 while (dbs_complete < cluster->dbarr.ndbs)
436 {
437 for (int i = 0; i < jobs; i++)
438 process_slot(cluster, &slots[i], task);
439
440 wait_on_slots(slots, jobs);
441 }
442
443 pg_free(slots);
444}
#define Max(x, y)
Definition c.h:1013
void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
Definition cluster.c:107
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition connect.h:25
Datum arg
Definition elog.c:1322
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
ConnStatusType PQstatus(const PGconn *conn)
PGconn * PQconnectStart(const char *conninfo)
Definition fe-connect.c:954
void PQfinish(PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
int PQsocket(const PGconn *conn)
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQisBusy(PGconn *conn)
Definition fe-exec.c:2048
void pg_free(void *ptr)
#define pg_realloc_array(pointer, type, count)
Definition fe_memutils.h:63
#define pg_malloc0_object(type)
Definition fe_memutils.h:51
#define pg_malloc0_array(type, count)
Definition fe_memutils.h:57
FILE * input
FILE * output
int i
Definition isn.c:77
#define PQgetResult
#define PQclear
#define PQresultStatus
@ CONNECTION_BAD
Definition libpq-fe.h:85
@ PGRES_COMMAND_OK
Definition libpq-fe.h:125
@ PGRES_TUPLES_OK
Definition libpq-fe.h:128
PostgresPollingStatusType
Definition libpq-fe.h:114
@ PGRES_POLLING_OK
Definition libpq-fe.h:118
@ PGRES_POLLING_READING
Definition libpq-fe.h:116
@ PGRES_POLLING_FAILED
Definition libpq-fe.h:115
#define pg_fatal(...)
OSInfo os_info
Definition pg_upgrade.c:75
void(* UpgradeTaskProcessCB)(DbInfo *dbinfo, PGresult *res, void *arg)
Definition pg_upgrade.h:523
bool protocol_negotiation_supported(const ClusterInfo *cluster)
Definition version.c:36
PQExpBuffer createPQExpBuffer(void)
Definition pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void destroyPQExpBuffer(PQExpBuffer str)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void termPQExpBuffer(PQExpBuffer str)
static int fb(int x)
UserOpts user_opts
Definition option.c:30
PGconn * conn
Definition streamutil.c:52
void appendConnStrVal(PQExpBuffer buf, const char *str)
char * db_name
Definition pg_upgrade.h:206
char * user
Definition pg_upgrade.h:366
PGconn * conn
Definition task.c:107
bool select_mode
Definition task.c:109
UpgradeTaskSlotState state
Definition task.c:104
bool free_result
Definition task.c:74
UpgradeTaskProcessCB process_cb
Definition task.c:73
void * arg
Definition task.c:75
PQExpBuffer queries
Definition task.c:86
UpgradeTaskStep * steps
Definition task.c:84
int num_steps
Definition task.c:85
static int dbs_processing
Definition task.c:63
UpgradeTask * upgrade_task_create(void)
Definition task.c:117
static void wait_on_slots(UpgradeTaskSlot *slots, int numslots)
Definition task.c:365
static int dbs_complete
Definition task.c:56
UpgradeTaskSlotState
Definition task.c:93
@ CONNECTING
Definition task.c:95
@ RUNNING_QUERIES
Definition task.c:96
@ FREE
Definition task.c:94
void upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
Definition task.c:421
static void process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition task.c:236
static void start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
Definition task.c:174
static int select_loop(int maxFd, fd_set *input, fd_set *output)
Definition task.c:332
void upgrade_task_free(UpgradeTask *task)
Definition task.c:133
static void process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition task.c:207
void upgrade_task_add_step(UpgradeTask *task, const char *query, UpgradeTaskProcessCB process_cb, bool free_result, void *arg)
Definition task.c:151
#define EINTR
Definition win32_port.h:361
#define select(n, r, w, e, timeout)
Definition win32_port.h:500