PostgreSQL Source Code  git master
task.c
Go to the documentation of this file.
1 /*
2  * task.c
3  * framework for parallelizing pg_upgrade's once-in-each-database tasks
4  *
5  * This framework provides an efficient way of running the various
6  * once-in-each-database tasks required by pg_upgrade. Specifically, it
7  * parallelizes these tasks by managing a set of slots that follow a simple
8  * state machine and by using libpq's asynchronous APIs to establish the
9  * connections and run the queries. Callers simply need to create a callback
10  * function and build/execute an UpgradeTask. A simple example follows:
11  *
12  * static void
13  * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
14  * {
15  * for (int i = 0; i < PQntuples(res); i++)
16  * {
17  * ... process results ...
18  * }
19  * }
20  *
21  * void
22  * my_task(ClusterInfo *cluster)
23  * {
24  * UpgradeTask *task = upgrade_task_create();
25  *
26  * upgrade_task_add_step(task,
27  * "... query text ...",
28  * my_process_cb,
29  * true, // let the task free the PGresult
30  * NULL); // "arg" pointer for callback
31  * upgrade_task_run(task, cluster);
32  * upgrade_task_free(task);
33  * }
34  *
35  * Note that multiple steps can be added to a given task. When there are
36  * multiple steps, the task will run all of the steps consecutively in the same
37  * database connection before freeing the connection and moving on. In other
38  * words, it only ever initiates one connection to each database in the
39  * cluster for a given run.
40  *
41  * Copyright (c) 2024, PostgreSQL Global Development Group
42  * src/bin/pg_upgrade/task.c
43  */
44 
45 #include "postgres_fe.h"
46 
47 #include "common/connect.h"
48 #include "fe_utils/string_utils.h"
49 #include "pg_upgrade.h"
50 
51 /*
52  * dbs_complete stores the number of databases that we have completed
53  * processing. When this value equals the number of databases in the cluster,
54  * the task is finished.
55  */
56 static int dbs_complete;
57 
58 /*
59  * dbs_processing stores the index of the next database in the cluster's array
60  * of databases that will be picked up for processing. It will always be
61  * greater than or equal to dbs_complete.
62  */
63 static int dbs_processing;
64 
65 /*
66  * This struct stores the information for a single step of a task. Note that
67  * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
68  * All steps in a task are run in a single connection before moving on to the
69  * next database (which requires a new connection).
70  */
71 typedef struct UpgradeTaskStep
72 {
73  UpgradeTaskProcessCB process_cb; /* processes the results of the query */
74  bool free_result; /* should we free the result? */
75  void *arg; /* pointer passed to process_cb */
77 
78 /*
79  * This struct is a thin wrapper around an array of steps, i.e.,
80  * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
81  */
83 {
85  int num_steps;
87 };
88 
89 /*
90  * The different states for a parallel slot.
91  */
93 {
94  FREE, /* slot available for use in a new database */
95  CONNECTING, /* waiting for connection to be established */
96  RUNNING_QUERIES, /* running/processing queries in the task */
98 
99 /*
100  * We maintain an array of user_opts.jobs slots to execute the task.
101  */
102 typedef struct UpgradeTaskSlot
103 {
104  UpgradeTaskSlotState state; /* state of the slot */
105  int db_idx; /* index of the database assigned to slot */
106  int step_idx; /* index of the current step of task */
107  PGconn *conn; /* current connection managed by slot */
108  bool ready; /* slot is ready for processing */
109  bool select_mode; /* select() mode: true->read, false->write */
110  int sock; /* file descriptor for connection's socket */
112 
113 /*
114  * Initializes an UpgradeTask.
115  */
116 UpgradeTask *
118 {
119  UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
120 
121  task->queries = createPQExpBuffer();
122 
123  /* All tasks must first set a secure search_path. */
124  upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);
125 
126  return task;
127 }
128 
129 /*
130  * Frees all storage associated with an UpgradeTask.
131  */
132 void
134 {
136  pg_free(task->steps);
137  pg_free(task);
138 }
139 
140 /*
141  * Adds a step to an UpgradeTask. The steps will be executed in each database
142  * in the order in which they are added.
143  *
144  * task: task object that must have been initialized via upgrade_task_create()
145  * query: the query text
146  * process_cb: function that processes the results of the query
147  * free_result: should we free the PGresult, or leave it to the caller?
148  * arg: pointer to task-specific data that is passed to each callback
149  */
150 void
151 upgrade_task_add_step(UpgradeTask *task, const char *query,
152  UpgradeTaskProcessCB process_cb, bool free_result,
153  void *arg)
154 {
155  UpgradeTaskStep *new_step;
156 
157  task->steps = pg_realloc(task->steps,
158  ++task->num_steps * sizeof(UpgradeTaskStep));
159 
160  new_step = &task->steps[task->num_steps - 1];
161  new_step->process_cb = process_cb;
162  new_step->free_result = free_result;
163  new_step->arg = arg;
164 
165  appendPQExpBuffer(task->queries, "%s;", query);
166 }
167 
168 /*
169  * Build a connection string for the slot's current database and asynchronously
170  * start a new connection, but do not wait for the connection to be
171  * established.
172  */
173 static void
175 {
176  PQExpBufferData conn_opts;
177  DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
178 
179  /* Build connection string with proper quoting */
180  initPQExpBuffer(&conn_opts);
181  appendPQExpBufferStr(&conn_opts, "dbname=");
182  appendConnStrVal(&conn_opts, dbinfo->db_name);
183  appendPQExpBufferStr(&conn_opts, " user=");
184  appendConnStrVal(&conn_opts, os_info.user);
185  appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
186  if (cluster->sockdir)
187  {
188  appendPQExpBufferStr(&conn_opts, " host=");
189  appendConnStrVal(&conn_opts, cluster->sockdir);
190  }
191 
192  slot->conn = PQconnectStart(conn_opts.data);
193 
194  if (!slot->conn)
195  pg_fatal("failed to create connection with connection string: \"%s\"",
196  conn_opts.data);
197 
198  termPQExpBuffer(&conn_opts);
199 }
200 
201 /*
202  * Run the process_cb callback function to process the result of a query, and
203  * free the result if the caller indicated we should do so.
204  */
205 static void
207  const UpgradeTask *task)
208 {
209  UpgradeTaskStep *steps = &task->steps[slot->step_idx];
210  UpgradeTaskProcessCB process_cb = steps->process_cb;
211  DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
212  PGresult *res = PQgetResult(slot->conn);
213 
214  if (PQstatus(slot->conn) == CONNECTION_BAD ||
217  pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
218 
219  /*
220  * We assume that a NULL process_cb callback function means there's
221  * nothing to process. This is primarily intended for the initial step in
222  * every task that sets a safe search_path.
223  */
224  if (process_cb)
225  (*process_cb) (dbinfo, res, steps->arg);
226 
227  if (steps->free_result)
228  PQclear(res);
229 }
230 
231 /*
232  * Advances the state machine for a given slot as necessary.
233  */
234 static void
236 {
238 
239  if (!slot->ready)
240  return;
241 
242  switch (slot->state)
243  {
244  case FREE:
245 
246  /*
247  * If all of the databases in the cluster have been processed or
248  * are currently being processed by other slots, we are done.
249  */
250  if (dbs_processing >= cluster->dbarr.ndbs)
251  return;
252 
253  /*
254  * Claim the next database in the cluster's array and initiate a
255  * new connection.
256  */
257  slot->db_idx = dbs_processing++;
258  slot->state = CONNECTING;
259  start_conn(cluster, slot);
260 
261  return;
262 
263  case CONNECTING:
264 
265  /* Check for connection failure. */
266  status = PQconnectPoll(slot->conn);
267  if (status == PGRES_POLLING_FAILED)
268  pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
269 
270  /* Check whether the connection is still establishing. */
271  if (status != PGRES_POLLING_OK)
272  {
273  slot->select_mode = (status == PGRES_POLLING_READING);
274  return;
275  }
276 
277  /*
278  * Move on to running/processing the queries in the task.
279  */
280  slot->state = RUNNING_QUERIES;
281  slot->select_mode = true; /* wait until ready for reading */
282  if (!PQsendQuery(slot->conn, task->queries->data))
283  pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
284 
285  return;
286 
287  case RUNNING_QUERIES:
288 
289  /*
290  * Consume any available data and clear the read-ready indicator
291  * for the connection.
292  */
293  if (!PQconsumeInput(slot->conn))
294  pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
295 
296  /*
297  * Process any results that are ready so that we can free up this
298  * slot for another database as soon as possible.
299  */
300  for (; slot->step_idx < task->num_steps; slot->step_idx++)
301  {
302  /* If no more results are available yet, move on. */
303  if (PQisBusy(slot->conn))
304  return;
305 
306  process_query_result(cluster, slot, task);
307  }
308 
309  /*
310  * If we just finished processing the result of the last step in
311  * the task, free the slot. We recursively call this function on
312  * the newly-freed slot so that we can start initiating the next
313  * connection immediately instead of waiting for the next loop
314  * through the slots.
315  */
316  dbs_complete++;
317  PQfinish(slot->conn);
318  memset(slot, 0, sizeof(UpgradeTaskSlot));
319  slot->ready = true;
320 
321  process_slot(cluster, slot, task);
322 
323  return;
324  }
325 }
326 
327 /*
328  * Returns -1 on error, else the number of ready descriptors.
329  */
330 static int
331 select_loop(int maxFd, fd_set *input, fd_set *output)
332 {
333  fd_set save_input = *input;
334  fd_set save_output = *output;
335 
336  if (maxFd == 0)
337  return 0;
338 
339  for (;;)
340  {
341  int i;
342 
343  *input = save_input;
344  *output = save_output;
345 
346  i = select(maxFd + 1, input, output, NULL, NULL);
347 
348 #ifndef WIN32
349  if (i < 0 && errno == EINTR)
350  continue;
351 #else
352  if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
353  continue;
354 #endif
355  return i;
356  }
357 }
358 
359 /*
360  * Wait on the slots to either finish connecting or to receive query results if
361  * possible. This avoids a tight loop in upgrade_task_run().
362  */
363 static void
364 wait_on_slots(UpgradeTaskSlot *slots, int numslots)
365 {
366  fd_set input;
367  fd_set output;
368  int maxFd = 0;
369 
370  FD_ZERO(&input);
371  FD_ZERO(&output);
372 
373  for (int i = 0; i < numslots; i++)
374  {
375  /*
376  * We assume the previous call to process_slot() handled everything
377  * that was marked ready in the previous call to wait_on_slots(), if
378  * any.
379  */
380  slots[i].ready = false;
381 
382  /*
383  * This function should only ever see free slots as we are finishing
384  * processing the last few databases, at which point we don't have any
385  * databases left for them to process. We'll never use these slots
386  * again, so we can safely ignore them.
387  */
388  if (slots[i].state == FREE)
389  continue;
390 
391  /*
392  * Add the socket to the set.
393  */
394  slots[i].sock = PQsocket(slots[i].conn);
395  if (slots[i].sock < 0)
396  pg_fatal("invalid socket");
397  FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
398  maxFd = Max(maxFd, slots[i].sock);
399  }
400 
401  /*
402  * If we found socket(s) to wait on, wait.
403  */
404  if (select_loop(maxFd, &input, &output) == -1)
405  pg_fatal("select() failed: %m");
406 
407  /*
408  * Mark which sockets appear to be ready.
409  */
410  for (int i = 0; i < numslots; i++)
411  slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
412  FD_ISSET(slots[i].sock, &output));
413 }
414 
415 /*
416  * Runs all the steps of the task in every database in the cluster using
417  * user_opts.jobs parallel slots.
418  */
419 void
421 {
422  int jobs = Max(1, user_opts.jobs);
423  UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
424 
425  dbs_complete = 0;
426  dbs_processing = 0;
427 
428  /*
429  * Process every slot the first time round.
430  */
431  for (int i = 0; i < jobs; i++)
432  slots[i].ready = true;
433 
434  while (dbs_complete < cluster->dbarr.ndbs)
435  {
436  for (int i = 0; i < jobs; i++)
437  process_slot(cluster, &slots[i], task);
438 
439  wait_on_slots(slots, jobs);
440  }
441 
442  pg_free(slots);
443 }
#define Max(x, y)
Definition: c.h:989
void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
Definition: cluster.c:108
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
PGconn * PQconnectStart(const char *conninfo)
Definition: fe-connect.c:873
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2597
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7212
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7149
void PQfinish(PGconn *conn)
Definition: fe-connect.c:4893
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7238
ExecStatusType PQresultStatus(const PGresult *res)
Definition: fe-exec.c:3411
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1984
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1416
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2031
PGresult * PQgetResult(PGconn *conn)
Definition: fe-exec.c:2062
void * pg_realloc(void *ptr, size_t size)
Definition: fe_memutils.c:65
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
void pg_free(void *ptr)
Definition: fe_memutils.c:105
FILE * input
FILE * output
int i
Definition: isn.c:73
@ CONNECTION_BAD
Definition: libpq-fe.h:82
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:120
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:123
PostgresPollingStatusType
Definition: libpq-fe.h:109
@ PGRES_POLLING_OK
Definition: libpq-fe.h:113
@ PGRES_POLLING_READING
Definition: libpq-fe.h:111
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:110
void * arg
#define pg_fatal(...)
static struct LogicalRepInfo * dbinfo
OSInfo os_info
Definition: pg_upgrade.c:69
void(* UpgradeTaskProcessCB)(DbInfo *dbinfo, PGresult *res, void *arg)
Definition: pg_upgrade.h:500
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void destroyPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:114
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
UserOpts user_opts
Definition: option.c:30
PGconn * conn
Definition: streamutil.c:55
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:545
char * user
Definition: pg_upgrade.h:344
int step_idx
Definition: task.c:106
int db_idx
Definition: task.c:105
PGconn * conn
Definition: task.c:107
bool select_mode
Definition: task.c:109
UpgradeTaskSlotState state
Definition: task.c:104
bool ready
Definition: task.c:108
bool free_result
Definition: task.c:74
UpgradeTaskProcessCB process_cb
Definition: task.c:73
void * arg
Definition: task.c:75
PQExpBuffer queries
Definition: task.c:86
UpgradeTaskStep * steps
Definition: task.c:84
int num_steps
Definition: task.c:85
int jobs
Definition: pg_upgrade.h:327
Definition: regguts.h:323
static int dbs_processing
Definition: task.c:63
static void wait_on_slots(UpgradeTaskSlot *slots, int numslots)
Definition: task.c:364
static int dbs_complete
Definition: task.c:56
struct UpgradeTaskSlot UpgradeTaskSlot
struct UpgradeTaskStep UpgradeTaskStep
UpgradeTask * upgrade_task_create(void)
Definition: task.c:117
UpgradeTaskSlotState
Definition: task.c:93
@ CONNECTING
Definition: task.c:95
@ RUNNING_QUERIES
Definition: task.c:96
@ FREE
Definition: task.c:94
void upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
Definition: task.c:420
static void process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition: task.c:235
static void start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
Definition: task.c:174
static int select_loop(int maxFd, fd_set *input, fd_set *output)
Definition: task.c:331
void upgrade_task_free(UpgradeTask *task)
Definition: task.c:133
static void process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition: task.c:206
void upgrade_task_add_step(UpgradeTask *task, const char *query, UpgradeTaskProcessCB process_cb, bool free_result, void *arg)
Definition: task.c:151
#define EINTR
Definition: win32_port.h:374
#define select(n, r, w, e, timeout)
Definition: win32_port.h:513