PostgreSQL Source Code git master
Loading...
Searching...
No Matches
method_worker.c File Reference
#include "postgres.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/aio_subsys.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/injection_point.h"
#include "utils/memdebug.h"
#include "utils/ps_status.h"
#include "utils/wait_event.h"
Include dependency graph for method_worker.c:

Go to the source code of this file.

Data Structures

struct  PgAioWorkerSubmissionQueue
 
struct  PgAioWorkerSlot
 
struct  PgAioWorkerControl
 

Macros

#define IO_WORKER_WAKEUP_FANOUT   2
 

Typedefs

typedef struct PgAioWorkerSubmissionQueue PgAioWorkerSubmissionQueue
 
typedef struct PgAioWorkerSlot PgAioWorkerSlot
 
typedef struct PgAioWorkerControl PgAioWorkerControl
 

Functions

static size_t pgaio_worker_shmem_size (void)
 
static void pgaio_worker_shmem_init (bool first_time)
 
static bool pgaio_worker_needs_synchronous_execution (PgAioHandle *ioh)
 
static int pgaio_worker_submit (uint16 num_staged_ios, PgAioHandle **staged_ios)
 
static size_t pgaio_worker_queue_shmem_size (int *queue_size)
 
static size_t pgaio_worker_control_shmem_size (void)
 
static int pgaio_worker_choose_idle (void)
 
static bool pgaio_worker_submission_queue_insert (PgAioHandle *ioh)
 
static int pgaio_worker_submission_queue_consume (void)
 
static uint32 pgaio_worker_submission_queue_depth (void)
 
static void pgaio_worker_submit_internal (int num_staged_ios, PgAioHandle **staged_ios)
 
static void pgaio_worker_die (int code, Datum arg)
 
static void pgaio_worker_register (void)
 
static void pgaio_worker_error_callback (void *arg)
 
void IoWorkerMain (const void *startup_data, size_t startup_data_len)
 
bool pgaio_workers_enabled (void)
 

Variables

const IoMethodOps pgaio_worker_ops
 
int io_workers = 3
 
static int io_worker_queue_size = 64
 
static int MyIoWorkerId
 
static PgAioWorkerSubmissionQueueio_worker_submission_queue
 
static PgAioWorkerControlio_worker_control
 

Macro Definition Documentation

◆ IO_WORKER_WAKEUP_FANOUT

#define IO_WORKER_WAKEUP_FANOUT   2

Definition at line 52 of file method_worker.c.

Typedef Documentation

◆ PgAioWorkerControl

◆ PgAioWorkerSlot

◆ PgAioWorkerSubmissionQueue

Function Documentation

◆ IoWorkerMain()

void IoWorkerMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 396 of file method_worker.c.

397{
399 PgAioHandle *volatile error_ioh = NULL;
400 ErrorContextCallback errcallback = {0};
401 volatile int error_errno = 0;
402 char cmd[128];
403
405
407 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
408
409 /*
410 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
411 * shutdown sequence, similar to checkpointer.
412 */
414 /* SIGQUIT handler was already set up by InitPostmasterChild */
419
420 /* also registers a shutdown callback to unregister */
422
423 sprintf(cmd, "%d", MyIoWorkerId);
424 set_ps_display(cmd);
425
427 errcallback.previous = error_context_stack;
428 error_context_stack = &errcallback;
429
430 /* see PostgresMain() */
431 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
432 {
435
437
438 /*
439 * In the - very unlikely - case that the IO failed in a way that
440 * raises an error we need to mark the IO as failed.
441 *
442 * Need to do just enough error recovery so that we can mark the IO as
443 * failed and then exit (postmaster will start a new worker).
444 */
446
447 if (error_ioh != NULL)
448 {
449 /* should never fail without setting error_errno */
450 Assert(error_errno != 0);
451
453
457 }
458
459 proc_exit(1);
460 }
461
462 /* We can now handle ereport(ERROR) */
464
466
468 {
471 int nlatches = 0;
472 int nwakeups = 0;
473 int worker;
474
475 /*
476 * Try to get a job to do.
477 *
478 * The lwlock acquisition also provides the necessary memory barrier
479 * to ensure that we don't see an outdated data in the handle.
480 */
483 {
484 /*
485 * Nothing to do. Mark self idle.
486 *
487 * XXX: Invent some kind of back pressure to reduce useless
488 * wakeups?
489 */
491 }
492 else
493 {
494 /* Got one. Clear idle flag. */
496
497 /* See if we can wake up some peers. */
500 for (int i = 0; i < nwakeups; ++i)
501 {
502 if ((worker = pgaio_worker_choose_idle()) < 0)
503 break;
505 }
506 }
508
509 for (int i = 0; i < nlatches; ++i)
511
512 if (io_index != -1)
513 {
515
517 error_ioh = ioh;
518 errcallback.arg = ioh;
519
521 "worker %d processing IO",
523
524 /*
525 * Prevent interrupts between pgaio_io_reopen() and
526 * pgaio_io_perform_synchronously() that otherwise could lead to
527 * the FD getting closed in that window.
528 */
530
531 /*
532 * It's very unlikely, but possible, that reopen fails. E.g. due
533 * to memory allocations failing or file permissions changing or
534 * such. In that case we need to fail the IO.
535 *
536 * There's not really a good errno we can report here.
537 */
540
541 /*
542 * To be able to exercise the reopen-fails path, allow injection
543 * points to trigger a failure at this point.
544 */
545 INJECTION_POINT("aio-worker-after-reopen", ioh);
546
547 error_errno = 0;
548 error_ioh = NULL;
549
550 /*
551 * As part of IO completion the buffer will be marked as NOACCESS,
552 * until the buffer is pinned again - which never happens in io
553 * workers. Therefore the next time there is IO for the same
554 * buffer, the memory will be considered inaccessible. To avoid
555 * that, explicitly allow access to the memory before reading data
556 * into it.
557 */
558#ifdef USE_VALGRIND
559 {
560 struct iovec *iov;
561 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
562
563 for (int i = 0; i < iov_length; i++)
565 }
566#endif
567
568 /*
569 * We don't expect this to ever fail with ERROR or FATAL, no need
570 * to keep error_ioh set to the IO.
571 * pgaio_io_perform_synchronously() contains a critical section to
572 * ensure we don't accidentally fail.
573 */
575
577 errcallback.arg = NULL;
578 }
579 else
580 {
584 }
585
587
589 {
590 ConfigReloadPending = false;
592 }
593 }
594
595 error_context_stack = errcallback.previous;
596 proc_exit(0);
597}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition aio.c:528
PgAioCtl * pgaio_ctl
Definition aio.c:78
#define pgaio_debug_io(elevel, ioh, msg,...)
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition aio_io.c:116
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition aio_target.c:116
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:40
sigset_t UnBlockSig
Definition pqsignal.c:22
#define Min(x, y)
Definition c.h:1093
#define Assert(condition)
Definition c.h:945
uint16_t uint16
Definition c.h:617
uint32_t uint32
Definition c.h:618
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
sigjmp_buf * PG_exception_stack
Definition elog.c:101
#define DEBUG4
Definition elog.h:27
struct Latch * MyLatch
Definition globals.c:63
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
void LWLockReleaseAll(void)
Definition lwlock.c:1893
@ LW_EXCLUSIVE
Definition lwlock.h:112
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
#define IO_WORKER_WAKEUP_FANOUT
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
static int MyIoWorkerId
static int pgaio_worker_submission_queue_consume(void)
static int pgaio_worker_choose_idle(void)
#define RESUME_INTERRUPTS()
Definition miscadmin.h:136
#define START_CRIT_SECTION()
Definition miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition miscadmin.h:134
#define END_CRIT_SECTION()
Definition miscadmin.h:152
#define die(msg)
#define pqsignal
Definition port.h:547
#define sprintf
Definition port.h:262
static int fb(int x)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:680
static void set_ps_display(const char *activity)
Definition ps_status.h:40
struct ErrorContextCallback * previous
Definition elog.h:297
void(* callback)(void *arg)
Definition elog.h:298
Definition latch.h:116
PgAioHandle * io_handles
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171

References ErrorContextCallback::arg, Assert, AuxiliaryProcessMainCommon(), ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG4, die, EmitErrorReport(), END_CRIT_SECTION, error_context_stack, fb(), HOLD_INTERRUPTS, i, PgAioWorkerControl::idle_worker_mask, INJECTION_POINT, PgAioCtl::io_handles, io_worker_control, IO_WORKER_WAKEUP_FANOUT, PgAioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), Min, MyIoWorkerId, MyLatch, PG_exception_stack, pgaio_ctl, pgaio_debug_io, pgaio_io_get_iovec_length(), pgaio_io_perform_synchronously(), pgaio_io_process_completion(), pgaio_io_reopen(), pgaio_worker_choose_idle(), pgaio_worker_error_callback(), pgaio_worker_register(), pgaio_worker_submission_queue_consume(), pgaio_worker_submission_queue_depth(), PGC_SIGHUP, pqsignal, ErrorContextCallback::previous, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), ResetLatch(), RESUME_INTERRUPTS, set_ps_display(), SetLatch(), ShutdownRequestPending, SIGALRM, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, sprintf, START_CRIT_SECTION, UnBlockSig, VALGRIND_MAKE_MEM_UNDEFINED, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and PgAioWorkerControl::workers.

◆ pgaio_worker_choose_idle()

static int pgaio_worker_choose_idle ( void  )
static

Definition at line 164 of file method_worker.c.

165{
166 int worker;
167
169 return -1;
170
171 /* Find the lowest bit position, and clear it. */
173 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
175
176 return worker;
177}
static int pg_rightmost_one_pos64(uint64 word)

References Assert, fb(), PgAioWorkerControl::idle_worker_mask, PgAioWorkerSlot::in_use, io_worker_control, pg_rightmost_one_pos64(), and PgAioWorkerControl::workers.

Referenced by IoWorkerMain(), and pgaio_worker_submit_internal().

◆ pgaio_worker_control_shmem_size()

static size_t pgaio_worker_control_shmem_size ( void  )
static

Definition at line 113 of file method_worker.c.

114{
115 return offsetof(PgAioWorkerControl, workers) +
117}
#define MAX_IO_WORKERS
Definition proc.h:523

References fb(), and MAX_IO_WORKERS.

Referenced by pgaio_worker_shmem_init(), and pgaio_worker_shmem_size().

◆ pgaio_worker_die()

◆ pgaio_worker_error_callback()

static void pgaio_worker_error_callback ( void arg)
static

Definition at line 375 of file method_worker.c.

376{
377 ProcNumber owner;
379 int32 owner_pid;
381
382 if (!ioh)
383 return;
384
385 Assert(ioh->owner_procno != MyProcNumber);
387
388 owner = ioh->owner_procno;
390 owner_pid = owner_proc->pid;
391
392 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
393}
int32_t int32
Definition c.h:614
Datum arg
Definition elog.c:1322
#define errcontext
Definition elog.h:198
ProcNumber MyProcNumber
Definition globals.c:90
@ B_IO_WORKER
Definition miscadmin.h:364
BackendType MyBackendType
Definition miscinit.c:65
#define GetPGProcByNumber(n)
Definition proc.h:501
int ProcNumber
Definition procnumber.h:24
Definition proc.h:176

References arg, Assert, B_IO_WORKER, errcontext, fb(), GetPGProcByNumber, MyBackendType, and MyProcNumber.

Referenced by IoWorkerMain().

◆ pgaio_worker_needs_synchronous_execution()

static bool pgaio_worker_needs_synchronous_execution ( PgAioHandle ioh)
static

Definition at line 234 of file method_worker.c.

235{
236 return
240}
@ PGAIO_HF_REFERENCES_LOCAL
Definition aio.h:60
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition aio_target.c:103
bool IsUnderPostmaster
Definition globals.c:120

References fb(), IsUnderPostmaster, PGAIO_HF_REFERENCES_LOCAL, and pgaio_io_can_reopen().

Referenced by pgaio_worker_submit_internal().

◆ pgaio_worker_queue_shmem_size()

static size_t pgaio_worker_queue_shmem_size ( int queue_size)
static

Definition at line 103 of file method_worker.c.

104{
105 /* Round size up to next power of two so we can make a mask. */
107
109 sizeof(int) * *queue_size;
110}
static int io_worker_queue_size
static uint32 pg_nextpower2_32(uint32 num)

References fb(), io_worker_queue_size, and pg_nextpower2_32().

Referenced by pgaio_worker_shmem_init(), and pgaio_worker_shmem_size().

◆ pgaio_worker_register()

static void pgaio_worker_register ( void  )
static

Definition at line 341 of file method_worker.c.

342{
343 MyIoWorkerId = -1;
344
345 /*
346 * XXX: This could do with more fine-grained locking. But it's also not
347 * very common for the number of workers to change at the moment...
348 */
350
351 for (int i = 0; i < MAX_IO_WORKERS; ++i)
352 {
354 {
357 MyIoWorkerId = i;
358 break;
359 }
360 else
362 }
363
364 if (MyIoWorkerId == -1)
365 elog(ERROR, "couldn't find a free worker slot");
366
370
372}
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void pgaio_worker_die(int code, Datum arg)

References Assert, elog, ERROR, fb(), i, PgAioWorkerControl::idle_worker_mask, PgAioWorkerSlot::in_use, io_worker_control, PgAioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAX_IO_WORKERS, MyIoWorkerId, MyLatch, on_shmem_exit(), pgaio_worker_die(), and PgAioWorkerControl::workers.

Referenced by IoWorkerMain().

◆ pgaio_worker_shmem_init()

static void pgaio_worker_shmem_init ( bool  first_time)
static

Definition at line 132 of file method_worker.c.

133{
134 bool found;
135 int queue_size;
136
138 ShmemInitStruct("AioWorkerSubmissionQueue",
140 &found);
141 if (!found)
142 {
143 io_worker_submission_queue->size = queue_size;
146 }
147
149 ShmemInitStruct("AioWorkerControl",
151 &found);
152 if (!found)
153 {
155 for (int i = 0; i < MAX_IO_WORKERS; ++i)
156 {
159 }
160 }
161}
static size_t pgaio_worker_control_shmem_size(void)
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381

References fb(), PgAioWorkerSubmissionQueue::head, i, PgAioWorkerControl::idle_worker_mask, PgAioWorkerSlot::in_use, io_worker_control, io_worker_submission_queue, PgAioWorkerSlot::latch, MAX_IO_WORKERS, pgaio_worker_control_shmem_size(), pgaio_worker_queue_shmem_size(), ShmemInitStruct(), PgAioWorkerSubmissionQueue::size, PgAioWorkerSubmissionQueue::tail, and PgAioWorkerControl::workers.

◆ pgaio_worker_shmem_size()

static size_t pgaio_worker_shmem_size ( void  )
static

Definition at line 120 of file method_worker.c.

121{
122 size_t sz;
123 int queue_size;
124
125 sz = pgaio_worker_queue_shmem_size(&queue_size);
127
128 return sz;
129}
Size add_size(Size s1, Size s2)
Definition shmem.c:485

References add_size(), fb(), pgaio_worker_control_shmem_size(), and pgaio_worker_queue_shmem_size().

◆ pgaio_worker_submission_queue_consume()

static int pgaio_worker_submission_queue_consume ( void  )
static

Definition at line 201 of file method_worker.c.

202{
204 int result;
205
207 if (queue->tail == queue->head)
208 return -1; /* empty */
209
210 result = queue->sqes[queue->tail];
211 queue->tail = (queue->tail + 1) & (queue->size - 1);
212
213 return result;
214}
int sqes[FLEXIBLE_ARRAY_MEMBER]

References PgAioWorkerSubmissionQueue::head, io_worker_submission_queue, PgAioWorkerSubmissionQueue::size, PgAioWorkerSubmissionQueue::sqes, and PgAioWorkerSubmissionQueue::tail.

Referenced by IoWorkerMain().

◆ pgaio_worker_submission_queue_depth()

static uint32 pgaio_worker_submission_queue_depth ( void  )
static

Definition at line 217 of file method_worker.c.

218{
219 uint32 head;
220 uint32 tail;
221
224
225 if (tail > head)
227
228 Assert(head >= tail);
229
230 return head - tail;
231}

References Assert, PgAioWorkerSubmissionQueue::head, io_worker_submission_queue, PgAioWorkerSubmissionQueue::size, and PgAioWorkerSubmissionQueue::tail.

Referenced by IoWorkerMain().

◆ pgaio_worker_submission_queue_insert()

static bool pgaio_worker_submission_queue_insert ( PgAioHandle ioh)
static

Definition at line 180 of file method_worker.c.

181{
184
186 new_head = (queue->head + 1) & (queue->size - 1);
187 if (new_head == queue->tail)
188 {
189 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
191 return false; /* full */
192 }
193
194 queue->sqes[queue->head] = pgaio_io_get_id(ioh);
195 queue->head = new_head;
196
197 return true;
198}
int pgaio_io_get_id(PgAioHandle *ioh)
Definition aio.c:342
#define pgaio_debug(elevel, msg,...)
#define DEBUG3
Definition elog.h:28

References DEBUG3, fb(), PgAioWorkerSubmissionQueue::head, io_worker_submission_queue, pgaio_debug, pgaio_io_get_id(), PgAioWorkerSubmissionQueue::size, PgAioWorkerSubmissionQueue::sqes, and PgAioWorkerSubmissionQueue::tail.

Referenced by pgaio_worker_submit_internal().

◆ pgaio_worker_submit()

static int pgaio_worker_submit ( uint16  num_staged_ios,
PgAioHandle **  staged_ios 
)
static

Definition at line 305 of file method_worker.c.

306{
307 for (int i = 0; i < num_staged_ios; i++)
308 {
309 PgAioHandle *ioh = staged_ios[i];
310
312 }
313
314 pgaio_worker_submit_internal(num_staged_ios, staged_ios);
315
316 return num_staged_ios;
317}
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition aio.c:510
static void pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)

References fb(), i, pgaio_io_prepare_submit(), and pgaio_worker_submit_internal().

◆ pgaio_worker_submit_internal()

static void pgaio_worker_submit_internal ( int  num_staged_ios,
PgAioHandle **  staged_ios 
)
static

Definition at line 243 of file method_worker.c.

244{
246 int nsync = 0;
247 Latch *wakeup = NULL;
248 int worker;
249
250 Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
251
253 {
254 for (int i = 0; i < num_staged_ios; ++i)
255 {
257 if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
258 {
259 /*
260 * Do the rest synchronously. If the queue is full, give up
261 * and do the rest synchronously. We're holding an exclusive
262 * lock on the queue so nothing can consume entries.
263 */
264 synchronous_ios = &staged_ios[i];
265 nsync = (num_staged_ios - i);
266
267 break;
268 }
269
270 if (wakeup == NULL)
271 {
272 /* Choose an idle worker to wake up if we haven't already. */
273 worker = pgaio_worker_choose_idle();
274 if (worker >= 0)
276
277 pgaio_debug_io(DEBUG4, staged_ios[i],
278 "choosing worker %d",
279 worker);
280 }
281 }
283 }
284 else
285 {
286 /* do everything synchronously, no wakeup needed */
287 synchronous_ios = staged_ios;
288 nsync = num_staged_ios;
289 }
290
291 if (wakeup)
293
294 /* Run whatever is left synchronously. */
295 if (nsync > 0)
296 {
297 for (int i = 0; i < nsync; ++i)
298 {
300 }
301 }
302}
#define PGAIO_SUBMIT_BATCH_SIZE
bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1348
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]

References Assert, DEBUG4, fb(), i, io_worker_control, PgAioWorkerSlot::latch, LW_EXCLUSIVE, LWLockConditionalAcquire(), LWLockRelease(), pgaio_debug_io, pgaio_io_perform_synchronously(), PGAIO_SUBMIT_BATCH_SIZE, pgaio_worker_choose_idle(), pgaio_worker_needs_synchronous_execution(), pgaio_worker_submission_queue_insert(), SetLatch(), wakeup, and PgAioWorkerControl::workers.

Referenced by pgaio_worker_submit().

◆ pgaio_workers_enabled()

bool pgaio_workers_enabled ( void  )

Definition at line 600 of file method_worker.c.

601{
602 return io_method == IOMETHOD_WORKER;
603}
int io_method
Definition aio.c:74
@ IOMETHOD_WORKER
Definition aio.h:35

References io_method, and IOMETHOD_WORKER.

Referenced by maybe_adjust_io_workers().

Variable Documentation

◆ io_worker_control

◆ io_worker_queue_size

int io_worker_queue_size = 64
static

Definition at line 96 of file method_worker.c.

Referenced by pgaio_worker_queue_shmem_size().

◆ io_worker_submission_queue

◆ io_workers

int io_workers = 3

Definition at line 93 of file method_worker.c.

Referenced by maybe_adjust_io_workers().

◆ MyIoWorkerId

int MyIoWorkerId
static

Definition at line 97 of file method_worker.c.

Referenced by IoWorkerMain(), pgaio_worker_die(), and pgaio_worker_register().

◆ pgaio_worker_ops

const IoMethodOps pgaio_worker_ops
Initial value:
= {
.shmem_size = pgaio_worker_shmem_size,
.shmem_init = pgaio_worker_shmem_init,
.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
}
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static size_t pgaio_worker_shmem_size(void)
static void pgaio_worker_shmem_init(bool first_time)

Definition at line 83 of file method_worker.c.

83 {
84 .shmem_size = pgaio_worker_shmem_size,
85 .shmem_init = pgaio_worker_shmem_init,
86
87 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
88 .submit = pgaio_worker_submit,
89};