PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
io_worker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

pg_noreturn void IoWorkerMain (const void *startup_data, size_t startup_data_len)
 

Variables

PGDLLIMPORT int io_workers
 

Function Documentation

◆ IoWorkerMain()

pg_noreturn void IoWorkerMain ( const void *  startup_data,
size_t  startup_data_len 
)

Definition at line 361 of file method_worker.c.

362{
363 sigjmp_buf local_sigjmp_buf;
364 PgAioHandle *volatile error_ioh = NULL;
365 volatile int error_errno = 0;
366 char cmd[128];
367
370
372 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
373
374 /*
375 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
376 * shutdown sequence, similar to checkpointer.
377 */
378 pqsignal(SIGTERM, SIG_IGN);
379 /* SIGQUIT handler was already set up by InitPostmasterChild */
380 pqsignal(SIGALRM, SIG_IGN);
381 pqsignal(SIGPIPE, SIG_IGN);
384
385 /* also registers a shutdown callback to unregister */
387
388 sprintf(cmd, "%d", MyIoWorkerId);
389 set_ps_display(cmd);
390
391 /* see PostgresMain() */
392 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
393 {
394 error_context_stack = NULL;
396
398
399 /*
400 * In the - very unlikely - case that the IO failed in a way that
401 * raises an error we need to mark the IO as failed.
402 *
403 * Need to do just enough error recovery so that we can mark the IO as
404 * failed and then exit (postmaster will start a new worker).
405 */
407
408 if (error_ioh != NULL)
409 {
410 /* should never fail without setting error_errno */
411 Assert(error_errno != 0);
412
413 errno = error_errno;
414
416 pgaio_io_process_completion(error_ioh, -error_errno);
418 }
419
420 proc_exit(1);
421 }
422
423 /* We can now handle ereport(ERROR) */
424 PG_exception_stack = &local_sigjmp_buf;
425
426 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
427
429 {
430 uint32 io_index;
432 int nlatches = 0;
433 int nwakeups = 0;
434 int worker;
435
436 /* Try to get a job to do. */
437 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
438 if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
439 {
440 /*
441 * Nothing to do. Mark self idle.
442 *
443 * XXX: Invent some kind of back pressure to reduce useless
444 * wakeups?
445 */
447 }
448 else
449 {
450 /* Got one. Clear idle flag. */
451 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
452
453 /* See if we can wake up some peers. */
456 for (int i = 0; i < nwakeups; ++i)
457 {
458 if ((worker = pgaio_choose_idle_worker()) < 0)
459 break;
460 latches[nlatches++] = io_worker_control->workers[worker].latch;
461 }
462 }
463 LWLockRelease(AioWorkerSubmissionQueueLock);
464
465 for (int i = 0; i < nlatches; ++i)
466 SetLatch(latches[i]);
467
468 if (io_index != UINT32_MAX)
469 {
470 PgAioHandle *ioh = NULL;
471
472 ioh = &pgaio_ctl->io_handles[io_index];
473 error_ioh = ioh;
474
476 "worker %d processing IO",
478
479 /*
480 * It's very unlikely, but possible, that reopen fails. E.g. due
481 * to memory allocations failing or file permissions changing or
482 * such. In that case we need to fail the IO.
483 *
484 * There's not really a good errno we can report here.
485 */
486 error_errno = ENOENT;
487 pgaio_io_reopen(ioh);
488
489 /*
490 * To be able to exercise the reopen-fails path, allow injection
491 * points to trigger a failure at this point.
492 */
493 pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
494
495 error_errno = 0;
496 error_ioh = NULL;
497
498 /*
499 * We don't expect this to ever fail with ERROR or FATAL, no need
500 * to keep error_ioh set to the IO.
501 * pgaio_io_perform_synchronously() contains a critical section to
502 * ensure we don't accidentally fail.
503 */
505 }
506 else
507 {
509 WAIT_EVENT_IO_WORKER_MAIN);
511 }
512
514 }
515
516 proc_exit(0);
517}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition: aio.c:492
PgAioCtl * pgaio_ctl
Definition: aio.c:76
#define pgaio_io_call_inj(ioh, injection_point)
Definition: aio_internal.h:376
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:358
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition: aio_io.c:116
void pgaio_io_reopen(PgAioHandle *ioh)
Definition: aio_target.c:108
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
#define Min(x, y)
Definition: c.h:975
uint32_t uint32
Definition: c.h:502
void EmitErrorReport(void)
Definition: elog.c:1687
ErrorContextCallback * error_context_stack
Definition: elog.c:94
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
#define DEBUG4
Definition: elog.h:27
struct Latch * MyLatch
Definition: globals.c:62
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:74
void SetLatch(Latch *latch)
Definition: latch.c:288
void ResetLatch(Latch *latch)
Definition: latch.c:372
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1179
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1899
void LWLockReleaseAll(void)
Definition: lwlock.c:1950
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static uint32 pgaio_worker_submission_queue_depth(void)
static int pgaio_choose_idle_worker(void)
#define IO_WORKER_WAKEUP_FANOUT
Definition: method_worker.c:50
static void pgaio_worker_register(void)
static int MyIoWorkerId
Definition: method_worker.c:96
static uint32 pgaio_worker_submission_queue_consume(void)
static AioWorkerControl * io_worker_control
Definition: method_worker.c:98
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
@ B_IO_WORKER
Definition: miscadmin.h:363
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
BackendType MyBackendType
Definition: miscinit.c:64
#define die(msg)
#define pqsignal
Definition: port.h:521
#define sprintf
Definition: port.h:241
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
uint64 idle_worker_mask
Definition: method_worker.c:70
AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:71
Definition: latch.h:114
PgAioHandle * io_handles
Definition: aio_internal.h:241
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171

References Assert(), AuxiliaryProcessMainCommon(), B_IO_WORKER, CHECK_FOR_INTERRUPTS, DEBUG4, die, EmitErrorReport(), END_CRIT_SECTION, error_context_stack, HOLD_INTERRUPTS, i, AioWorkerControl::idle_worker_mask, PgAioCtl::io_handles, io_worker_control, IO_WORKER_WAKEUP_FANOUT, AioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), Min, MyBackendType, MyIoWorkerId, MyLatch, PG_exception_stack, pgaio_choose_idle_worker(), pgaio_ctl, pgaio_debug_io, pgaio_io_call_inj, pgaio_io_perform_synchronously(), pgaio_io_process_completion(), pgaio_io_reopen(), pgaio_worker_register(), pgaio_worker_submission_queue_consume(), pgaio_worker_submission_queue_depth(), pqsignal, proc_exit(), procsignal_sigusr1_handler(), ResetLatch(), set_ps_display(), SetLatch(), ShutdownRequestPending, SIGALRM, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, sprintf, START_CRIT_SECTION, UnBlockSig, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and AioWorkerControl::workers.

Variable Documentation

◆ io_workers

PGDLLIMPORT int io_workers
extern

Definition at line 92 of file method_worker.c.

Referenced by maybe_adjust_io_workers().