50#define IO_WORKER_WAKEUP_FANOUT 2
108 sizeof(
uint32) * *queue_size;
184 new_head = (queue->
head + 1) & (queue->
size - 1);
185 if (new_head == queue->
tail)
193 queue->
head = new_head;
208 result = queue->
ios[queue->
tail];
251 for (
int i = 0;
i < nios; ++
i)
260 synchronous_ios[nsync++] = ios[
i];
272 "choosing worker %d",
284 for (
int i = 0;
i < nsync; ++
i)
294 for (
int i = 0;
i < num_staged_ios;
i++)
303 return num_staged_ios;
351 elog(
ERROR,
"couldn't find a free worker slot");
363 sigjmp_buf local_sigjmp_buf;
365 volatile int error_errno = 0;
392 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
408 if (error_ioh != NULL)
456 for (
int i = 0;
i < nwakeups; ++
i)
465 for (
int i = 0;
i < nlatches; ++
i)
468 if (io_index != UINT32_MAX)
476 "worker %d processing IO",
493 error_errno = ENOENT;
518 WAIT_EVENT_IO_WORKER_MAIN);
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
int pgaio_io_get_id(PgAioHandle *ioh)
void pgaio_io_prepare_submit(PgAioHandle *ioh)
@ PGAIO_HF_REFERENCES_LOCAL
#define pgaio_io_call_inj(ioh, injection_point)
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
void pgaio_io_reopen(PgAioHandle *ioh)
bool pgaio_io_can_reopen(PgAioHandle *ioh)
void AuxiliaryProcessMainCommon(void)
#define FLEXIBLE_ARRAY_MEMBER
void EmitErrorReport(void)
ErrorContextCallback * error_context_stack
sigjmp_buf * PG_exception_stack
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
void LWLockReleaseAll(void)
static size_t pgaio_worker_control_shmem_size(void)
static uint32 pgaio_worker_submission_queue_depth(void)
struct AioWorkerControl AioWorkerControl
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_choose_idle_worker(void)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
#define IO_WORKER_WAKEUP_FANOUT
static size_t pgaio_worker_shmem_size(void)
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static AioWorkerSubmissionQueue * io_worker_submission_queue
static int io_worker_queue_size
static void pgaio_worker_register(void)
struct AioWorkerSubmissionQueue AioWorkerSubmissionQueue
const IoMethodOps pgaio_worker_ops
static void pgaio_worker_die(int code, Datum arg)
struct AioWorkerSlot AioWorkerSlot
static uint32 pgaio_worker_submission_queue_consume(void)
static void pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static AioWorkerControl * io_worker_control
static void pgaio_worker_shmem_init(bool first_time)
#define RESUME_INTERRUPTS()
#define START_CRIT_SECTION()
#define CHECK_FOR_INTERRUPTS()
#define HOLD_INTERRUPTS()
#define END_CRIT_SECTION()
BackendType MyBackendType
static int pg_rightmost_one_pos64(uint64 word)
static uint32 pg_nextpower2_32(uint32 num)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static void set_ps_display(const char *activity)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
uint32 ios[FLEXIBLE_ARRAY_MEMBER]
size_t(* shmem_size)(void)
#define WL_EXIT_ON_PM_DEATH
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]