PostgreSQL Source Code git master
Loading...
Searching...
No Matches
io_worker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

pg_noreturn void IoWorkerMain (const void *startup_data, size_t startup_data_len)
 

Variables

PGDLLIMPORT int io_workers
 

Function Documentation

◆ IoWorkerMain()

pg_noreturn void IoWorkerMain ( const void startup_data,
size_t  startup_data_len 
)
extern

Definition at line 385 of file method_worker.c.

386{
388 PgAioHandle *volatile error_ioh = NULL;
389 ErrorContextCallback errcallback = {0};
390 volatile int error_errno = 0;
391 char cmd[128];
392
395
397 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
398
399 /*
400 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
401 * shutdown sequence, similar to checkpointer.
402 */
404 /* SIGQUIT handler was already set up by InitPostmasterChild */
409
410 /* also registers a shutdown callback to unregister */
412
413 sprintf(cmd, "%d", MyIoWorkerId);
414 set_ps_display(cmd);
415
417 errcallback.previous = error_context_stack;
418 error_context_stack = &errcallback;
419
420 /* see PostgresMain() */
421 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
422 {
425
427
428 /*
429 * In the - very unlikely - case that the IO failed in a way that
430 * raises an error we need to mark the IO as failed.
431 *
432 * Need to do just enough error recovery so that we can mark the IO as
433 * failed and then exit (postmaster will start a new worker).
434 */
436
437 if (error_ioh != NULL)
438 {
439 /* should never fail without setting error_errno */
440 Assert(error_errno != 0);
441
443
447 }
448
449 proc_exit(1);
450 }
451
452 /* We can now handle ereport(ERROR) */
454
456
458 {
461 int nlatches = 0;
462 int nwakeups = 0;
463 int worker;
464
465 /*
466 * Try to get a job to do.
467 *
468 * The lwlock acquisition also provides the necessary memory barrier
469 * to ensure that we don't see an outdated data in the handle.
470 */
473 {
474 /*
475 * Nothing to do. Mark self idle.
476 *
477 * XXX: Invent some kind of back pressure to reduce useless
478 * wakeups?
479 */
481 }
482 else
483 {
484 /* Got one. Clear idle flag. */
486
487 /* See if we can wake up some peers. */
490 for (int i = 0; i < nwakeups; ++i)
491 {
492 if ((worker = pgaio_worker_choose_idle()) < 0)
493 break;
495 }
496 }
498
499 for (int i = 0; i < nlatches; ++i)
501
502 if (io_index != -1)
503 {
505
507 error_ioh = ioh;
508 errcallback.arg = ioh;
509
511 "worker %d processing IO",
513
514 /*
515 * Prevent interrupts between pgaio_io_reopen() and
516 * pgaio_io_perform_synchronously() that otherwise could lead to
517 * the FD getting closed in that window.
518 */
520
521 /*
522 * It's very unlikely, but possible, that reopen fails. E.g. due
523 * to memory allocations failing or file permissions changing or
524 * such. In that case we need to fail the IO.
525 *
526 * There's not really a good errno we can report here.
527 */
530
531 /*
532 * To be able to exercise the reopen-fails path, allow injection
533 * points to trigger a failure at this point.
534 */
535 INJECTION_POINT("aio-worker-after-reopen", ioh);
536
537 error_errno = 0;
538 error_ioh = NULL;
539
540 /*
541 * As part of IO completion the buffer will be marked as NOACCESS,
542 * until the buffer is pinned again - which never happens in io
543 * workers. Therefore the next time there is IO for the same
544 * buffer, the memory will be considered inaccessible. To avoid
545 * that, explicitly allow access to the memory before reading data
546 * into it.
547 */
548#ifdef USE_VALGRIND
549 {
550 struct iovec *iov;
551 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
552
553 for (int i = 0; i < iov_length; i++)
555 }
556#endif
557
558 /*
559 * We don't expect this to ever fail with ERROR or FATAL, no need
560 * to keep error_ioh set to the IO.
561 * pgaio_io_perform_synchronously() contains a critical section to
562 * ensure we don't accidentally fail.
563 */
565
567 errcallback.arg = NULL;
568 }
569 else
570 {
574 }
575
577
579 {
580 ConfigReloadPending = false;
582 }
583 }
584
585 error_context_stack = errcallback.previous;
586 proc_exit(0);
587}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition aio.c:528
PgAioCtl * pgaio_ctl
Definition aio.c:78
#define pgaio_debug_io(elevel, ioh, msg,...)
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition aio_io.c:116
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition aio_target.c:116
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:39
sigset_t UnBlockSig
Definition pqsignal.c:22
#define Min(x, y)
Definition c.h:997
#define Assert(condition)
Definition c.h:873
uint16_t uint16
Definition c.h:545
uint32_t uint32
Definition c.h:546
void EmitErrorReport(void)
Definition elog.c:1704
ErrorContextCallback * error_context_stack
Definition elog.c:95
sigjmp_buf * PG_exception_stack
Definition elog.c:97
#define DEBUG4
Definition elog.h:27
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1176
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1793
void LWLockReleaseAll(void)
Definition lwlock.c:1892
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
#define IO_WORKER_WAKEUP_FANOUT
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
static int MyIoWorkerId
static int pgaio_worker_submission_queue_consume(void)
static int pgaio_worker_choose_idle(void)
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define START_CRIT_SECTION()
Definition miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
@ B_IO_WORKER
Definition miscadmin.h:364
#define END_CRIT_SECTION()
Definition miscadmin.h:152
BackendType MyBackendType
Definition miscinit.c:64
#define die(msg)
#define pqsignal
Definition port.h:547
#define sprintf
Definition port.h:262
static int fb(int x)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:677
static void set_ps_display(const char *activity)
Definition ps_status.h:40
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
Definition latch.h:114
PgAioHandle * io_handles
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171

References ErrorContextCallback::arg, Assert, AuxiliaryProcessMainCommon(), B_IO_WORKER, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG4, die, EmitErrorReport(), END_CRIT_SECTION, error_context_stack, fb(), HOLD_INTERRUPTS, i, PgAioWorkerControl::idle_worker_mask, INJECTION_POINT, PgAioCtl::io_handles, io_worker_control, IO_WORKER_WAKEUP_FANOUT, PgAioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), Min, MyBackendType, MyIoWorkerId, MyLatch, PG_exception_stack, pgaio_ctl, pgaio_debug_io, pgaio_io_get_iovec_length(), pgaio_io_perform_synchronously(), pgaio_io_process_completion(), pgaio_io_reopen(), pgaio_worker_choose_idle(), pgaio_worker_error_callback(), pgaio_worker_register(), pgaio_worker_submission_queue_consume(), pgaio_worker_submission_queue_depth(), PGC_SIGHUP, pqsignal, ErrorContextCallback::previous, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), ResetLatch(), RESUME_INTERRUPTS, set_ps_display(), SetLatch(), ShutdownRequestPending, SIGALRM, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, sprintf, START_CRIT_SECTION, UnBlockSig, VALGRIND_MAKE_MEM_UNDEFINED, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and PgAioWorkerControl::workers.

Variable Documentation

◆ io_workers

PGDLLIMPORT int io_workers
extern

Definition at line 93 of file method_worker.c.

Referenced by maybe_adjust_io_workers().