PostgreSQL Source Code git master
Loading...
Searching...
No Matches
task.c File Reference
#include "postgres_fe.h"
#include "common/connect.h"
#include "fe_utils/string_utils.h"
#include "pg_upgrade.h"
Include dependency graph for task.c:

Go to the source code of this file.

Data Structures

struct  UpgradeTaskStep
 
struct  UpgradeTask
 
struct  UpgradeTaskSlot
 

Typedefs

typedef struct UpgradeTaskStep UpgradeTaskStep
 
typedef enum UpgradeTaskSlotState UpgradeTaskSlotState
 
typedef struct UpgradeTaskSlot UpgradeTaskSlot
 

Enumerations

enum  UpgradeTaskSlotState { FREE , CONNECTING , RUNNING_QUERIES }
 

Functions

UpgradeTaskupgrade_task_create (void)
 
void upgrade_task_free (UpgradeTask *task)
 
void upgrade_task_add_step (UpgradeTask *task, const char *query, UpgradeTaskProcessCB process_cb, bool free_result, void *arg)
 
static void start_conn (const ClusterInfo *cluster, UpgradeTaskSlot *slot)
 
static void process_query_result (const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
 
static void process_slot (const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
 
static int select_loop (int maxFd, fd_set *input, fd_set *output)
 
static void wait_on_slots (UpgradeTaskSlot *slots, int numslots)
 
void upgrade_task_run (const UpgradeTask *task, const ClusterInfo *cluster)
 

Variables

static int dbs_complete
 
static int dbs_processing
 

Typedef Documentation

◆ UpgradeTaskSlot

◆ UpgradeTaskSlotState

◆ UpgradeTaskStep

Enumeration Type Documentation

◆ UpgradeTaskSlotState

Enumerator
FREE 
CONNECTING 
RUNNING_QUERIES 

Definition at line 92 of file task.c.

93{
94 FREE, /* slot available for use in a new database */
95 CONNECTING, /* waiting for connection to be established */
96 RUNNING_QUERIES, /* running/processing queries in the task */
UpgradeTaskSlotState
Definition task.c:93
@ CONNECTING
Definition task.c:95
@ RUNNING_QUERIES
Definition task.c:96
@ FREE
Definition task.c:94

Function Documentation

◆ process_query_result()

static void process_query_result ( const ClusterInfo cluster,
UpgradeTaskSlot slot,
const UpgradeTask task 
)
static

Definition at line 207 of file task.c.

209{
210 UpgradeTaskStep *steps = &task->steps[slot->step_idx];
211 UpgradeTaskProcessCB process_cb = steps->process_cb;
212 DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
213 PGresult *res = PQgetResult(slot->conn);
214
215 if (PQstatus(slot->conn) == CONNECTION_BAD ||
218 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
219
220 /*
221 * We assume that a NULL process_cb callback function means there's
222 * nothing to process. This is primarily intended for the initial step in
223 * every task that sets a safe search_path.
224 */
225 if (process_cb)
226 (*process_cb) (dbinfo, res, steps->arg);
227
228 if (steps->free_result)
229 PQclear(res);
230}
void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel)
Definition cluster.c:107
ConnStatusType PQstatus(const PGconn *conn)
char * PQerrorMessage(const PGconn *conn)
#define PQgetResult
#define PQclear
#define PQresultStatus
@ CONNECTION_BAD
Definition libpq-fe.h:85
@ PGRES_COMMAND_OK
Definition libpq-fe.h:125
@ PGRES_TUPLES_OK
Definition libpq-fe.h:128
#define pg_fatal(...)
void(* UpgradeTaskProcessCB)(DbInfo *dbinfo, PGresult *res, void *arg)
Definition pg_upgrade.h:523
static int fb(int x)
PGconn * conn
Definition task.c:107
bool free_result
Definition task.c:74
UpgradeTaskProcessCB process_cb
Definition task.c:73
void * arg
Definition task.c:75

References UpgradeTaskStep::arg, cluster(), UpgradeTaskSlot::conn, CONNECTION_BAD, UpgradeTaskSlot::db_idx, fb(), UpgradeTaskStep::free_result, pg_fatal, PGRES_COMMAND_OK, PGRES_TUPLES_OK, PQclear, PQerrorMessage(), PQgetResult, PQresultStatus, PQstatus(), UpgradeTaskStep::process_cb, and UpgradeTaskSlot::step_idx.

Referenced by process_slot().

◆ process_slot()

static void process_slot ( const ClusterInfo cluster,
UpgradeTaskSlot slot,
const UpgradeTask task 
)
static

Definition at line 236 of file task.c.

237{
239
240 if (!slot->ready)
241 return;
242
243 switch (slot->state)
244 {
245 case FREE:
246
247 /*
248 * If all of the databases in the cluster have been processed or
249 * are currently being processed by other slots, we are done.
250 */
251 if (dbs_processing >= cluster->dbarr.ndbs)
252 return;
253
254 /*
255 * Claim the next database in the cluster's array and initiate a
256 * new connection.
257 */
258 slot->db_idx = dbs_processing++;
259 slot->state = CONNECTING;
260 start_conn(cluster, slot);
261
262 return;
263
264 case CONNECTING:
265
266 /* Check for connection failure. */
267 status = PQconnectPoll(slot->conn);
268 if (status == PGRES_POLLING_FAILED)
269 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
270
271 /* Check whether the connection is still establishing. */
272 if (status != PGRES_POLLING_OK)
273 {
274 slot->select_mode = (status == PGRES_POLLING_READING);
275 return;
276 }
277
278 /*
279 * Move on to running/processing the queries in the task.
280 */
281 slot->state = RUNNING_QUERIES;
282 slot->select_mode = true; /* wait until ready for reading */
283 if (!PQsendQuery(slot->conn, task->queries->data))
284 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
285
286 return;
287
288 case RUNNING_QUERIES:
289
290 /*
291 * Consume any available data and clear the read-ready indicator
292 * for the connection.
293 */
294 if (!PQconsumeInput(slot->conn))
295 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
296
297 /*
298 * Process any results that are ready so that we can free up this
299 * slot for another database as soon as possible.
300 */
301 for (; slot->step_idx < task->num_steps; slot->step_idx++)
302 {
303 /* If no more results are available yet, move on. */
304 if (PQisBusy(slot->conn))
305 return;
306
308 }
309
310 /*
311 * If we just finished processing the result of the last step in
312 * the task, free the slot. We recursively call this function on
313 * the newly-freed slot so that we can start initiating the next
314 * connection immediately instead of waiting for the next loop
315 * through the slots.
316 */
317 dbs_complete++;
318 PQfinish(slot->conn);
319 memset(slot, 0, sizeof(UpgradeTaskSlot));
320 slot->ready = true;
321
322 process_slot(cluster, slot, task);
323
324 return;
325 }
326}
PostgresPollingStatusType PQconnectPoll(PGconn *conn)
void PQfinish(PGconn *conn)
int PQconsumeInput(PGconn *conn)
Definition fe-exec.c:2001
int PQsendQuery(PGconn *conn, const char *query)
Definition fe-exec.c:1433
int PQisBusy(PGconn *conn)
Definition fe-exec.c:2048
PostgresPollingStatusType
Definition libpq-fe.h:114
@ PGRES_POLLING_OK
Definition libpq-fe.h:118
@ PGRES_POLLING_READING
Definition libpq-fe.h:116
@ PGRES_POLLING_FAILED
Definition libpq-fe.h:115
bool select_mode
Definition task.c:109
UpgradeTaskSlotState state
Definition task.c:104
static int dbs_processing
Definition task.c:63
static int dbs_complete
Definition task.c:56
static void process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition task.c:236
static void start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
Definition task.c:174
static void process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
Definition task.c:207

References cluster(), UpgradeTaskSlot::conn, CONNECTING, UpgradeTaskSlot::db_idx, dbs_complete, dbs_processing, fb(), FREE, pg_fatal, PGRES_POLLING_FAILED, PGRES_POLLING_OK, PGRES_POLLING_READING, PQconnectPoll(), PQconsumeInput(), PQerrorMessage(), PQfinish(), PQisBusy(), PQsendQuery(), process_query_result(), process_slot(), UpgradeTaskSlot::ready, RUNNING_QUERIES, UpgradeTaskSlot::select_mode, start_conn(), UpgradeTaskSlot::state, and UpgradeTaskSlot::step_idx.

Referenced by process_slot(), and upgrade_task_run().

◆ select_loop()

static int select_loop ( int  maxFd,
fd_set input,
fd_set output 
)
static

Definition at line 332 of file task.c.

333{
336
337 if (maxFd == 0)
338 return 0;
339
340 for (;;)
341 {
342 int i;
343
344 *input = save_input;
346
347 i = select(maxFd + 1, input, output, NULL, NULL);
348
349#ifndef WIN32
350 if (i < 0 && errno == EINTR)
351 continue;
352#else
353 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
354 continue;
355#endif
356 return i;
357 }
358}
FILE * input
FILE * output
int i
Definition isn.c:77
#define EINTR
Definition win32_port.h:361
#define select(n, r, w, e, timeout)
Definition win32_port.h:500

References EINTR, fb(), i, input, output, and select.

Referenced by wait_on_slots().

◆ start_conn()

static void start_conn ( const ClusterInfo cluster,
UpgradeTaskSlot slot 
)
static

Definition at line 174 of file task.c.

175{
177 DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
178
179 /* Build connection string with proper quoting */
181 appendPQExpBufferStr(&conn_opts, "dbname=");
185 appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
186 if (cluster->sockdir)
187 {
190 }
192 appendPQExpBufferStr(&conn_opts, " max_protocol_version=3.0");
193
194 slot->conn = PQconnectStart(conn_opts.data);
195
196 if (!slot->conn)
197 pg_fatal("out of memory");
198
200}
PGconn * PQconnectStart(const char *conninfo)
Definition fe-connect.c:954
OSInfo os_info
Definition pg_upgrade.c:75
bool protocol_negotiation_supported(const ClusterInfo *cluster)
Definition version.c:36
void initPQExpBuffer(PQExpBuffer str)
Definition pqexpbuffer.c:90
void appendPQExpBuffer(PQExpBuffer str, const char *fmt,...)
void appendPQExpBufferStr(PQExpBuffer str, const char *data)
void termPQExpBuffer(PQExpBuffer str)
void appendConnStrVal(PQExpBuffer buf, const char *str)
char * db_name
Definition pg_upgrade.h:206
char * user
Definition pg_upgrade.h:366

References appendConnStrVal(), appendPQExpBuffer(), appendPQExpBufferStr(), cluster(), UpgradeTaskSlot::conn, UpgradeTaskSlot::db_idx, DbInfo::db_name, fb(), initPQExpBuffer(), os_info, pg_fatal, PQconnectStart(), protocol_negotiation_supported(), termPQExpBuffer(), and OSInfo::user.

Referenced by process_slot().

◆ upgrade_task_add_step()

void upgrade_task_add_step ( UpgradeTask task,
const char query,
UpgradeTaskProcessCB  process_cb,
bool  free_result,
void arg 
)

◆ upgrade_task_create()

UpgradeTask * upgrade_task_create ( void  )

◆ upgrade_task_free()

◆ upgrade_task_run()

void upgrade_task_run ( const UpgradeTask task,
const ClusterInfo cluster 
)

Definition at line 421 of file task.c.

422{
423 int jobs = Max(1, user_opts.jobs);
425
426 dbs_complete = 0;
427 dbs_processing = 0;
428
429 /*
430 * Process every slot the first time round.
431 */
432 for (int i = 0; i < jobs; i++)
433 slots[i].ready = true;
434
435 while (dbs_complete < cluster->dbarr.ndbs)
436 {
437 for (int i = 0; i < jobs; i++)
438 process_slot(cluster, &slots[i], task);
439
440 wait_on_slots(slots, jobs);
441 }
442
443 pg_free(slots);
444}
#define Max(x, y)
Definition c.h:1013
#define pg_malloc0_array(type, count)
Definition fe_memutils.h:57
UserOpts user_opts
Definition option.c:30
static void wait_on_slots(UpgradeTaskSlot *slots, int numslots)
Definition task.c:365

References cluster(), dbs_complete, dbs_processing, fb(), i, UserOpts::jobs, Max, pg_free(), pg_malloc0_array, process_slot(), user_opts, and wait_on_slots().

Referenced by check_for_data_types_usage(), check_for_gist_inet_ops(), check_for_incompatible_polymorphics(), check_for_isn_and_int8_passing_mismatch(), check_for_not_null_inheritance(), check_for_tables_with_oids(), check_for_unicode_update(), check_for_user_defined_encoding_conversions(), check_for_user_defined_postfix_ops(), check_old_cluster_subscription_state(), get_db_rel_and_slot_infos(), get_loadable_libraries(), and report_extension_updates().

◆ wait_on_slots()

static void wait_on_slots ( UpgradeTaskSlot slots,
int  numslots 
)
static

Definition at line 365 of file task.c.

366{
369 int maxFd = 0;
370
371 FD_ZERO(&input);
372 FD_ZERO(&output);
373
374 for (int i = 0; i < numslots; i++)
375 {
376 /*
377 * We assume the previous call to process_slot() handled everything
378 * that was marked ready in the previous call to wait_on_slots(), if
379 * any.
380 */
381 slots[i].ready = false;
382
383 /*
384 * This function should only ever see free slots as we are finishing
385 * processing the last few databases, at which point we don't have any
386 * databases left for them to process. We'll never use these slots
387 * again, so we can safely ignore them.
388 */
389 if (slots[i].state == FREE)
390 continue;
391
392 /*
393 * Add the socket to the set.
394 */
395 slots[i].sock = PQsocket(slots[i].conn);
396 if (slots[i].sock < 0)
397 pg_fatal("invalid socket");
398 FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
399 maxFd = Max(maxFd, slots[i].sock);
400 }
401
402 /*
403 * If we found socket(s) to wait on, wait.
404 */
405 if (select_loop(maxFd, &input, &output) == -1)
406 pg_fatal("%s() failed: %m", "select");
407
408 /*
409 * Mark which sockets appear to be ready.
410 */
411 for (int i = 0; i < numslots; i++)
412 slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
413 FD_ISSET(slots[i].sock, &output));
414}
int PQsocket(const PGconn *conn)
PGconn * conn
Definition streamutil.c:52
static int select_loop(int maxFd, fd_set *input, fd_set *output)
Definition task.c:332

References conn, fb(), FREE, i, input, Max, output, pg_fatal, PQsocket(), UpgradeTaskSlot::ready, select_loop(), and UpgradeTaskSlot::sock.

Referenced by upgrade_task_run().

Variable Documentation

◆ dbs_complete

int dbs_complete
static

Definition at line 56 of file task.c.

Referenced by process_slot(), and upgrade_task_run().

◆ dbs_processing

int dbs_processing
static

Definition at line 63 of file task.c.

Referenced by process_slot(), and upgrade_task_run().