PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
task.c File Reference
#include "postgres_fe.h"
#include "common/connect.h"
#include "fe_utils/string_utils.h"
#include "pg_upgrade.h"
Include dependency graph for task.c:

Go to the source code of this file.

Data Structures

struct  UpgradeTaskStep
 
struct  UpgradeTask
 
struct  UpgradeTaskSlot
 

Typedefs

typedef struct UpgradeTaskStep UpgradeTaskStep
 
typedef enum UpgradeTaskSlotState UpgradeTaskSlotState
 
typedef struct UpgradeTaskSlot UpgradeTaskSlot
 

Enumerations

enum  UpgradeTaskSlotState { FREE , CONNECTING , RUNNING_QUERIES }
 

Functions

UpgradeTaskupgrade_task_create (void)
 
void upgrade_task_free (UpgradeTask *task)
 
void upgrade_task_add_step (UpgradeTask *task, const char *query, UpgradeTaskProcessCB process_cb, bool free_result, void *arg)
 
static void start_conn (const ClusterInfo *cluster, UpgradeTaskSlot *slot)
 
static void process_query_result (const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
 
static void process_slot (const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
 
static int select_loop (int maxFd, fd_set *input, fd_set *output)
 
static void wait_on_slots (UpgradeTaskSlot *slots, int numslots)
 
void upgrade_task_run (const UpgradeTask *task, const ClusterInfo *cluster)
 

Variables

static int dbs_complete
 
static int dbs_processing
 

Typedef Documentation

◆ UpgradeTaskSlot

◆ UpgradeTaskSlotState

◆ UpgradeTaskStep

Enumeration Type Documentation

◆ UpgradeTaskSlotState

Enumerator
FREE 
CONNECTING 
RUNNING_QUERIES 

Definition at line 92 of file task.c.

93{
94 FREE, /* slot available for use in a new database */
95 CONNECTING, /* waiting for connection to be established */
96 RUNNING_QUERIES, /* running/processing queries in the task */
UpgradeTaskSlotState
Definition: task.c:93
@ CONNECTING
Definition: task.c:95
@ RUNNING_QUERIES
Definition: task.c:96
@ FREE
Definition: task.c:94

Function Documentation

◆ process_query_result()

static void process_query_result ( const ClusterInfo cluster,
UpgradeTaskSlot slot,
const UpgradeTask task 
)
static

Definition at line 205 of file task.c.

207{
208 UpgradeTaskStep *steps = &task->steps[slot->step_idx];
209 UpgradeTaskProcessCB process_cb = steps->process_cb;
210 DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
211 PGresult *res = PQgetResult(slot->conn);
212
213 if (PQstatus(slot->conn) == CONNECTION_BAD ||
216 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
217
218 /*
219 * We assume that a NULL process_cb callback function means there's
220 * nothing to process. This is primarily intended for the initial step in
221 * every task that sets a safe search_path.
222 */
223 if (process_cb)
224 (*process_cb) (dbinfo, res, steps->arg);
225
226 if (steps->free_result)
227 PQclear(res);
228}
void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
Definition: cluster.c:107
ConnStatusType PQstatus(const PGconn *conn)
Definition: fe-connect.c:7616
char * PQerrorMessage(const PGconn *conn)
Definition: fe-connect.c:7679
#define PQgetResult
Definition: libpq-be-fe.h:246
#define PQclear
Definition: libpq-be-fe.h:245
#define PQresultStatus
Definition: libpq-be-fe.h:247
@ CONNECTION_BAD
Definition: libpq-fe.h:85
@ PGRES_COMMAND_OK
Definition: libpq-fe.h:125
@ PGRES_TUPLES_OK
Definition: libpq-fe.h:128
#define pg_fatal(...)
void(* UpgradeTaskProcessCB)(DbInfo *dbinfo, PGresult *res, void *arg)
Definition: pg_upgrade.h:512
int step_idx
Definition: task.c:106
int db_idx
Definition: task.c:105
PGconn * conn
Definition: task.c:107
bool free_result
Definition: task.c:74
UpgradeTaskProcessCB process_cb
Definition: task.c:73
void * arg
Definition: task.c:75
UpgradeTaskStep * steps
Definition: task.c:84

References UpgradeTaskStep::arg, cluster(), UpgradeTaskSlot::conn, CONNECTION_BAD, UpgradeTaskSlot::db_idx, UpgradeTaskStep::free_result, pg_fatal, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQgetResult, PQresultStatus, PQstatus(), UpgradeTaskStep::process_cb, UpgradeTaskSlot::step_idx, and UpgradeTask::steps.

Referenced by process_slot().

◆ process_slot()

static void process_slot ( const ClusterInfo cluster,
UpgradeTaskSlot slot,
const UpgradeTask task 
)
static

Definition at line 234 of file task.c.

235{
237
238 if (!slot->ready)
239 return;
240
241 switch (slot->state)
242 {
243 case FREE:
244
245 /*
246 * If all of the databases in the cluster have been processed or
247 * are currently being processed by other slots, we are done.
248 */
249 if (dbs_processing >= cluster->dbarr.ndbs)
250 return;
251
252 /*
253 * Claim the next database in the cluster's array and initiate a
254 * new connection.
255 */
256 slot->db_idx = dbs_processing++;
257 slot->state = CONNECTING;
258 start_conn(cluster, slot);
259
260 return;
261
262 case CONNECTING:
263
264 /* Check for connection failure. */
265 status = PQconnectPoll(slot->conn);
266 if (status == PGRES_POLLING_FAILED)
267 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
268
269 /* Check whether the connection is still establishing. */
270 if (status != PGRES_POLLING_OK)
271 {
272 slot->select_mode = (status == PGRES_POLLING_READING);
273 return;
274 }
275
276 /*
277 * Move on to running/processing the queries in the task.
278 */
279 slot->state = RUNNING_QUERIES;
280 slot->select_mode = true; /* wait until ready for reading */
281 if (!PQsendQuery(slot->conn, task->queries->data))
282 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
283
284 return;
285
286 case RUNNING_QUERIES:
287
288 /*
289 * Consume any available data and clear the read-ready indicator
290 * for the connection.
291 */
292 if (!PQconsumeInput(slot->conn))
293 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
294
295 /*
296 * Process any results that are ready so that we can free up this
297 * slot for another database as soon as possible.
298 */
299 for (; slot->step_idx < task->num_steps; slot->step_idx++)
300 {
301 /* If no more results are available yet, move on. */
302 if (PQisBusy(slot->conn))
303 return;
304
305 process_query_result(cluster, slot, task);
306 }
307
308 /*
309 * If we just finished processing the result of the last step in
310 * the task, free the slot. We recursively call this function on
311 * the newly-freed slot so that we can start initiating the next
312 * connection immediately instead of waiting for the next loop
313 * through the slots.
314 */
315 dbs_complete++;
316 PQfinish(slot->conn);
317 memset(slot, 0, sizeof(UpgradeTaskSlot));
318 slot->ready = true;
319
320 process_slot(cluster, slot, task);
321
322 return;
323 }
324}
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
Definition: fe-connect.c:2911
void PQfinish(PGconn *conn)
Definition: fe-connect.c:5305
int PQconsumeInput(PGconn *conn)
Definition: fe-exec.c:1995
int PQsendQuery(PGconn *conn, const char *query)
Definition: fe-exec.c:1427
int PQisBusy(PGconn *conn)
Definition: fe-exec.c:2042
PostgresPollingStatusType
Definition: libpq-fe.h:114
@ PGRES_POLLING_OK
Definition: libpq-fe.h:118
@ PGRES_POLLING_READING
Definition: libpq-fe.h:116
@ PGRES_POLLING_FAILED
Definition: libpq-fe.h:115
bool select_mode
Definition: task.c:109
UpgradeTaskSlotState state
Definition: task.c:104
bool ready
Definition: task.c:108
PQExpBuffer queries
Definition: task.c:86
int num_steps
Definition: task.c:85
static int dbs_processing
Definition: task.c:63
static int dbs_complete
Definition: task.c:56
static void process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition: task.c:234
static void start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
Definition: task.c:174
static void process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition: task.c:205

References cluster(), UpgradeTaskSlot::conn, CONNECTING, PQExpBufferData::data, UpgradeTaskSlot::db_idx, dbs_complete, dbs_processing, FREE, UpgradeTask::num_steps, pg_fatal, PGRES_POLLING_FAILED, PGRES_POLLING_OK, PGRES_POLLING_READING, PQconnectPoll(), PQconsumeInput(), PQerrorMessage(), PQfinish(), PQisBusy(), PQsendQuery(), process_query_result(), process_slot(), UpgradeTask::queries, UpgradeTaskSlot::ready, RUNNING_QUERIES, UpgradeTaskSlot::select_mode, start_conn(), UpgradeTaskSlot::state, and UpgradeTaskSlot::step_idx.

Referenced by process_slot(), and upgrade_task_run().

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set *  input,
fd_set *  output 
)
static

Definition at line 330 of file task.c.

331{
332 fd_set save_input = *input;
333 fd_set save_output = *output;
334
335 if (maxFd == 0)
336 return 0;
337
338 for (;;)
339 {
340 int i;
341
342 *input = save_input;
343 *output = save_output;
344
345 i = select(maxFd + 1, input, output, NULL, NULL);
346
347#ifndef WIN32
348 if (i < 0 && errno == EINTR)
349 continue;
350#else
351 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
352 continue;
353#endif
354 return i;
355 }
356}
FILE * input
FILE * output
int i
Definition: isn.c:77
#define EINTR
Definition: win32_port.h:364
#define select(n, r, w, e, timeout)
Definition: win32_port.h:503

References EINTR, i, input, output, and select.

Referenced by wait_on_slots().

◆ start_conn()

static void start_conn ( const ClusterInfo cluster,
UpgradeTaskSlot slot 
)
static

Definition at line 174 of file task.c.

175{
176 PQExpBufferData conn_opts;
177 DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
178
179 /* Build connection string with proper quoting */
180 initPQExpBuffer(&conn_opts);
181 appendPQExpBufferStr(&conn_opts, "dbname=");
182 appendConnStrVal(&conn_opts, dbinfo->db_name);
183 appendPQExpBufferStr(&conn_opts, " user=");
184 appendConnStrVal(&conn_opts, os_info.user);
185 appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
186 if (cluster->sockdir)
187 {
188 appendPQExpBufferStr(&conn_opts, " host=");
189 appendConnStrVal(&conn_opts, cluster->sockdir);
190 }
191
192 slot->conn = PQconnectStart(conn_opts.data);
193
194 if (!slot->conn)
195 pg_fatal("out of memory");
196
197 termPQExpBuffer(&conn_opts);
198}
PGconn * PQconnectStart(const char *conninfo)
Definition: fe-connect.c:951
OSInfo os_info
Definition: pg_upgrade.c:74
void initPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
Definition: pqexpbuffer.c:265
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
Definition: pqexpbuffer.c:367
void termPQExpBuffer(PQExpBuffer str)
Definition: pqexpbuffer.c:129
void appendConnStrVal(PQExpBuffer buf, const char *str)
Definition: string_utils.c:698
char * db_name
Definition: pg_upgrade.h:199
char * user
Definition: pg_upgrade.h:359

References appendConnStrVal(), appendPQExpBuffer(), appendPQExpBufferStr(), cluster(), UpgradeTaskSlot::conn, PQExpBufferData::data, UpgradeTaskSlot::db_idx, DbInfo::db_name, initPQExpBuffer(), os_info, pg_fatal, PQconnectStart(), termPQExpBuffer(), and OSInfo::user.

Referenced by process_slot().

◆ upgrade_task_add_step()

◆ upgrade_task_create()

UpgradeTask * upgrade_task_create ( void  )

Definition at line 117 of file task.c.

118{
119 UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
120
121 task->queries = createPQExpBuffer();
122
123 /* All tasks must first set a secure search_path. */
125
126 return task;
127}
#define ALWAYS_SECURE_SEARCH_PATH_SQL
Definition: connect.h:25
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
PQExpBuffer createPQExpBuffer(void)
Definition: pqexpbuffer.c:72
void upgrade_task_add_step(UpgradeTask *task, const char *query, UpgradeTaskProcessCB process_cb, bool free_result, void *arg)
Definition: task.c:151

References ALWAYS_SECURE_SEARCH_PATH_SQL, createPQExpBuffer(), pg_malloc0(), UpgradeTask::queries, and upgrade_task_add_step().

Referenced by check_for_data_types_usage(), check_for_incompatible_polymorphics(), check_for_isn_and_int8_passing_mismatch(), check_for_not_null_inheritance(), check_for_tables_with_oids(), check_for_unicode_update(), check_for_user_defined_encoding_conversions(), check_for_user_defined_postfix_ops(), check_old_cluster_subscription_state(), get_db_rel_and_slot_infos(), get_loadable_libraries(), and report_extension_updates().

◆ upgrade_task_free()

◆ upgrade_task_run()

void upgrade_task_run ( const UpgradeTask task,
const ClusterInfo cluster 
)

Definition at line 419 of file task.c.

420{
421 int jobs = Max(1, user_opts.jobs);
422 UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
423
424 dbs_complete = 0;
425 dbs_processing = 0;
426
427 /*
428 * Process every slot the first time round.
429 */
430 for (int i = 0; i < jobs; i++)
431 slots[i].ready = true;
432
433 while (dbs_complete < cluster->dbarr.ndbs)
434 {
435 for (int i = 0; i < jobs; i++)
436 process_slot(cluster, &slots[i], task);
437
438 wait_on_slots(slots, jobs);
439 }
440
441 pg_free(slots);
442}
#define Max(x, y)
Definition: c.h:1001
UserOpts user_opts
Definition: option.c:30
int jobs
Definition: pg_upgrade.h:338
static void wait_on_slots(UpgradeTaskSlot *slots, int numslots)
Definition: task.c:363

References cluster(), dbs_complete, dbs_processing, i, UserOpts::jobs, Max, pg_free(), pg_malloc0(), process_slot(), user_opts, and wait_on_slots().

Referenced by check_for_data_types_usage(), check_for_incompatible_polymorphics(), check_for_isn_and_int8_passing_mismatch(), check_for_not_null_inheritance(), check_for_tables_with_oids(), check_for_unicode_update(), check_for_user_defined_encoding_conversions(), check_for_user_defined_postfix_ops(), check_old_cluster_subscription_state(), get_db_rel_and_slot_infos(), get_loadable_libraries(), and report_extension_updates().

◆ wait_on_slots()

static void wait_on_slots ( UpgradeTaskSlot slots,
int  numslots 
)
static

Definition at line 363 of file task.c.

364{
365 fd_set input;
366 fd_set output;
367 int maxFd = 0;
368
369 FD_ZERO(&input);
370 FD_ZERO(&output);
371
372 for (int i = 0; i < numslots; i++)
373 {
374 /*
375 * We assume the previous call to process_slot() handled everything
376 * that was marked ready in the previous call to wait_on_slots(), if
377 * any.
378 */
379 slots[i].ready = false;
380
381 /*
382 * This function should only ever see free slots as we are finishing
383 * processing the last few databases, at which point we don't have any
384 * databases left for them to process. We'll never use these slots
385 * again, so we can safely ignore them.
386 */
387 if (slots[i].state == FREE)
388 continue;
389
390 /*
391 * Add the socket to the set.
392 */
393 slots[i].sock = PQsocket(slots[i].conn);
394 if (slots[i].sock < 0)
395 pg_fatal("invalid socket");
396 FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
397 maxFd = Max(maxFd, slots[i].sock);
398 }
399
400 /*
401 * If we found socket(s) to wait on, wait.
402 */
403 if (select_loop(maxFd, &input, &output) == -1)
404 pg_fatal("%s() failed: %m", "select");
405
406 /*
407 * Mark which sockets appear to be ready.
408 */
409 for (int i = 0; i < numslots; i++)
410 slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
411 FD_ISSET(slots[i].sock, &output));
412}
int PQsocket(const PGconn *conn)
Definition: fe-connect.c:7705
PGconn * conn
Definition: streamutil.c:52
Definition: regguts.h:323
static int select_loop(int maxFd, fd_set *input, fd_set *output)
Definition: task.c:330

References conn, FREE, i, input, Max, output, pg_fatal, PQsocket(), UpgradeTaskSlot::ready, select_loop(), and UpgradeTaskSlot::sock.

Referenced by upgrade_task_run().

Variable Documentation

◆ dbs_complete

int dbs_complete
static

Definition at line 56 of file task.c.

Referenced by process_slot(), and upgrade_task_run().

◆ dbs_processing

int dbs_processing
static

Definition at line 63 of file task.c.

Referenced by process_slot(), and upgrade_task_run().