PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
method_worker.c File Reference
#include "postgres.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/aio_subsys.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/memdebug.h"
#include "utils/ps_status.h"
#include "utils/wait_event.h"
Include dependency graph for method_worker.c:

Go to the source code of this file.

Data Structures

struct  AioWorkerSubmissionQueue
 
struct  AioWorkerSlot
 
struct  AioWorkerControl
 

Macros

#define IO_WORKER_WAKEUP_FANOUT   2
 

Typedefs

typedef struct AioWorkerSubmissionQueue AioWorkerSubmissionQueue
 
typedef struct AioWorkerSlot AioWorkerSlot
 
typedef struct AioWorkerControl AioWorkerControl
 

Functions

static size_t pgaio_worker_shmem_size (void)
 
static void pgaio_worker_shmem_init (bool first_time)
 
static bool pgaio_worker_needs_synchronous_execution (PgAioHandle *ioh)
 
static int pgaio_worker_submit (uint16 num_staged_ios, PgAioHandle **staged_ios)
 
static size_t pgaio_worker_queue_shmem_size (int *queue_size)
 
static size_t pgaio_worker_control_shmem_size (void)
 
static int pgaio_choose_idle_worker (void)
 
static bool pgaio_worker_submission_queue_insert (PgAioHandle *ioh)
 
static uint32 pgaio_worker_submission_queue_consume (void)
 
static uint32 pgaio_worker_submission_queue_depth (void)
 
static void pgaio_worker_submit_internal (int nios, PgAioHandle *ios[])
 
static void pgaio_worker_die (int code, Datum arg)
 
static void pgaio_worker_register (void)
 
static void pgaio_worker_error_callback (void *arg)
 
void IoWorkerMain (const void *startup_data, size_t startup_data_len)
 
bool pgaio_workers_enabled (void)
 

Variables

const IoMethodOps pgaio_worker_ops
 
int io_workers = 3
 
static int io_worker_queue_size = 64
 
static int MyIoWorkerId
 
static AioWorkerSubmissionQueueio_worker_submission_queue
 
static AioWorkerControlio_worker_control
 

Macro Definition Documentation

◆ IO_WORKER_WAKEUP_FANOUT

#define IO_WORKER_WAKEUP_FANOUT   2

Definition at line 51 of file method_worker.c.

Typedef Documentation

◆ AioWorkerControl

◆ AioWorkerSlot

typedef struct AioWorkerSlot AioWorkerSlot

◆ AioWorkerSubmissionQueue

Function Documentation

◆ IoWorkerMain()

void IoWorkerMain ( const void *  startup_data,
size_t  startup_data_len 
)

Definition at line 383 of file method_worker.c.

384{
385 sigjmp_buf local_sigjmp_buf;
386 PgAioHandle *volatile error_ioh = NULL;
387 ErrorContextCallback errcallback = {0};
388 volatile int error_errno = 0;
389 char cmd[128];
390
393
395 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
396
397 /*
398 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
399 * shutdown sequence, similar to checkpointer.
400 */
401 pqsignal(SIGTERM, SIG_IGN);
402 /* SIGQUIT handler was already set up by InitPostmasterChild */
403 pqsignal(SIGALRM, SIG_IGN);
404 pqsignal(SIGPIPE, SIG_IGN);
407
408 /* also registers a shutdown callback to unregister */
410
411 sprintf(cmd, "%d", MyIoWorkerId);
412 set_ps_display(cmd);
413
415 errcallback.previous = error_context_stack;
416 error_context_stack = &errcallback;
417
418 /* see PostgresMain() */
419 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
420 {
421 error_context_stack = NULL;
423
425
426 /*
427 * In the - very unlikely - case that the IO failed in a way that
428 * raises an error we need to mark the IO as failed.
429 *
430 * Need to do just enough error recovery so that we can mark the IO as
431 * failed and then exit (postmaster will start a new worker).
432 */
434
435 if (error_ioh != NULL)
436 {
437 /* should never fail without setting error_errno */
438 Assert(error_errno != 0);
439
440 errno = error_errno;
441
443 pgaio_io_process_completion(error_ioh, -error_errno);
445 }
446
447 proc_exit(1);
448 }
449
450 /* We can now handle ereport(ERROR) */
451 PG_exception_stack = &local_sigjmp_buf;
452
453 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
454
456 {
457 uint32 io_index;
459 int nlatches = 0;
460 int nwakeups = 0;
461 int worker;
462
463 /* Try to get a job to do. */
464 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
465 if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
466 {
467 /*
468 * Nothing to do. Mark self idle.
469 *
470 * XXX: Invent some kind of back pressure to reduce useless
471 * wakeups?
472 */
474 }
475 else
476 {
477 /* Got one. Clear idle flag. */
478 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
479
480 /* See if we can wake up some peers. */
483 for (int i = 0; i < nwakeups; ++i)
484 {
485 if ((worker = pgaio_choose_idle_worker()) < 0)
486 break;
487 latches[nlatches++] = io_worker_control->workers[worker].latch;
488 }
489 }
490 LWLockRelease(AioWorkerSubmissionQueueLock);
491
492 for (int i = 0; i < nlatches; ++i)
493 SetLatch(latches[i]);
494
495 if (io_index != UINT32_MAX)
496 {
497 PgAioHandle *ioh = NULL;
498
499 ioh = &pgaio_ctl->io_handles[io_index];
500 error_ioh = ioh;
501 errcallback.arg = ioh;
502
504 "worker %d processing IO",
506
507 /*
508 * Prevent interrupts between pgaio_io_reopen() and
509 * pgaio_io_perform_synchronously() that otherwise could lead to
510 * the FD getting closed in that window.
511 */
513
514 /*
515 * It's very unlikely, but possible, that reopen fails. E.g. due
516 * to memory allocations failing or file permissions changing or
517 * such. In that case we need to fail the IO.
518 *
519 * There's not really a good errno we can report here.
520 */
521 error_errno = ENOENT;
522 pgaio_io_reopen(ioh);
523
524 /*
525 * To be able to exercise the reopen-fails path, allow injection
526 * points to trigger a failure at this point.
527 */
528 pgaio_io_call_inj(ioh, "aio-worker-after-reopen");
529
530 error_errno = 0;
531 error_ioh = NULL;
532
533 /*
534 * As part of IO completion the buffer will be marked as NOACCESS,
535 * until the buffer is pinned again - which never happens in io
536 * workers. Therefore the next time there is IO for the same
537 * buffer, the memory will be considered inaccessible. To avoid
538 * that, explicitly allow access to the memory before reading data
539 * into it.
540 */
541#ifdef USE_VALGRIND
542 {
543 struct iovec *iov;
544 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
545
546 for (int i = 0; i < iov_length; i++)
547 VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len);
548 }
549#endif
550
551 /*
552 * We don't expect this to ever fail with ERROR or FATAL, no need
553 * to keep error_ioh set to the IO.
554 * pgaio_io_perform_synchronously() contains a critical section to
555 * ensure we don't accidentally fail.
556 */
558
560 errcallback.arg = NULL;
561 }
562 else
563 {
565 WAIT_EVENT_IO_WORKER_MAIN);
567 }
568
570 }
571
572 error_context_stack = errcallback.previous;
573 proc_exit(0);
574}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition: aio.c:500
PgAioCtl * pgaio_ctl
Definition: aio.c:81
#define pgaio_io_call_inj(ioh, injection_point)
Definition: aio_internal.h:407
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:389
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition: aio_io.c:116
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition: aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition: aio_target.c:110
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
#define Min(x, y)
Definition: c.h:975
uint16_t uint16
Definition: c.h:501
uint32_t uint32
Definition: c.h:502
void EmitErrorReport(void)
Definition: elog.c:1709
ErrorContextCallback * error_context_stack
Definition: elog.c:95
sigjmp_buf * PG_exception_stack
Definition: elog.c:97
#define DEBUG4
Definition: elog.h:27
struct Latch * MyLatch
Definition: globals.c:64
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:109
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:65
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:288
void ResetLatch(Latch *latch)
Definition: latch.c:372
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1182
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1902
void LWLockReleaseAll(void)
Definition: lwlock.c:1953
@ LW_EXCLUSIVE
Definition: lwlock.h:114
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition: memdebug.h:28
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
static int pgaio_choose_idle_worker(void)
#define IO_WORKER_WAKEUP_FANOUT
Definition: method_worker.c:51
static void pgaio_worker_register(void)
static int MyIoWorkerId
Definition: method_worker.c:97
static uint32 pgaio_worker_submission_queue_consume(void)
static AioWorkerControl * io_worker_control
Definition: method_worker.c:99
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:136
#define START_CRIT_SECTION()
Definition: miscadmin.h:150
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:134
@ B_IO_WORKER
Definition: miscadmin.h:364
#define END_CRIT_SECTION()
Definition: miscadmin.h:152
BackendType MyBackendType
Definition: miscinit.c:64
#define die(msg)
#define pqsignal
Definition: port.h:531
#define sprintf
Definition: port.h:241
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
uint64 idle_worker_mask
Definition: method_worker.c:71
AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:72
struct ErrorContextCallback * previous
Definition: elog.h:297
void(* callback)(void *arg)
Definition: elog.h:298
Definition: latch.h:114
PgAioHandle * io_handles
Definition: aio_internal.h:246
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171

References ErrorContextCallback::arg, Assert(), AuxiliaryProcessMainCommon(), B_IO_WORKER, ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, DEBUG4, die, EmitErrorReport(), END_CRIT_SECTION, error_context_stack, HOLD_INTERRUPTS, i, AioWorkerControl::idle_worker_mask, PgAioCtl::io_handles, io_worker_control, IO_WORKER_WAKEUP_FANOUT, AioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), Min, MyBackendType, MyIoWorkerId, MyLatch, PG_exception_stack, pgaio_choose_idle_worker(), pgaio_ctl, pgaio_debug_io, pgaio_io_call_inj, pgaio_io_get_iovec_length(), pgaio_io_perform_synchronously(), pgaio_io_process_completion(), pgaio_io_reopen(), pgaio_worker_error_callback(), pgaio_worker_register(), pgaio_worker_submission_queue_consume(), pgaio_worker_submission_queue_depth(), pqsignal, ErrorContextCallback::previous, proc_exit(), procsignal_sigusr1_handler(), ResetLatch(), RESUME_INTERRUPTS, set_ps_display(), SetLatch(), ShutdownRequestPending, SIGALRM, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, sprintf, START_CRIT_SECTION, UnBlockSig, VALGRIND_MAKE_MEM_UNDEFINED, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and AioWorkerControl::workers.

◆ pgaio_choose_idle_worker()

static int pgaio_choose_idle_worker ( void  )
static

Definition at line 164 of file method_worker.c.

165{
166 int worker;
167
169 return -1;
170
171 /* Find the lowest bit position, and clear it. */
173 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
174
175 return worker;
176}
static int pg_rightmost_one_pos64(uint64 word)
Definition: pg_bitutils.h:145

References AioWorkerControl::idle_worker_mask, io_worker_control, and pg_rightmost_one_pos64().

Referenced by IoWorkerMain(), and pgaio_worker_submit_internal().

◆ pgaio_worker_control_shmem_size()

static size_t pgaio_worker_control_shmem_size ( void  )
static

Definition at line 113 of file method_worker.c.

114{
115 return offsetof(AioWorkerControl, workers) +
117}
struct AioWorkerSlot AioWorkerSlot
#define MAX_IO_WORKERS
Definition: proc.h:446

References MAX_IO_WORKERS.

Referenced by pgaio_worker_shmem_init(), and pgaio_worker_shmem_size().

◆ pgaio_worker_die()

static void pgaio_worker_die ( int  code,
Datum  arg 
)
static

◆ pgaio_worker_error_callback()

static void pgaio_worker_error_callback ( void *  arg)
static

Definition at line 362 of file method_worker.c.

363{
364 ProcNumber owner;
365 PGPROC *owner_proc;
366 int32 owner_pid;
367 PgAioHandle *ioh = arg;
368
369 if (!ioh)
370 return;
371
374
375 owner = ioh->owner_procno;
376 owner_proc = GetPGProcByNumber(owner);
377 owner_pid = owner_proc->pid;
378
379 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
380}
int32_t int32
Definition: c.h:498
#define errcontext
Definition: elog.h:197
ProcNumber MyProcNumber
Definition: globals.c:91
void * arg
#define GetPGProcByNumber(n)
Definition: proc.h:424
int ProcNumber
Definition: procnumber.h:24
Definition: proc.h:163
int pid
Definition: proc.h:183
int32 owner_procno
Definition: aio_internal.h:125

References arg, Assert(), B_IO_WORKER, errcontext, GetPGProcByNumber, MyBackendType, MyProcNumber, PgAioHandle::owner_procno, and PGPROC::pid.

Referenced by IoWorkerMain().

◆ pgaio_worker_needs_synchronous_execution()

static bool pgaio_worker_needs_synchronous_execution ( PgAioHandle ioh)
static

Definition at line 233 of file method_worker.c.

234{
235 return
238 || !pgaio_io_can_reopen(ioh);
239}
@ PGAIO_HF_REFERENCES_LOCAL
Definition: aio.h:60
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition: aio_target.c:99
bool IsUnderPostmaster
Definition: globals.c:121

References PgAioHandle::flags, IsUnderPostmaster, PGAIO_HF_REFERENCES_LOCAL, and pgaio_io_can_reopen().

Referenced by pgaio_worker_submit_internal().

◆ pgaio_worker_queue_shmem_size()

static size_t pgaio_worker_queue_shmem_size ( int *  queue_size)
static

Definition at line 103 of file method_worker.c.

104{
105 /* Round size up to next power of two so we can make a mask. */
107
108 return offsetof(AioWorkerSubmissionQueue, ios) +
109 sizeof(uint32) * *queue_size;
110}
static int io_worker_queue_size
Definition: method_worker.c:96
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189

References io_worker_queue_size, and pg_nextpower2_32().

Referenced by pgaio_worker_shmem_init(), and pgaio_worker_shmem_size().

◆ pgaio_worker_register()

static void pgaio_worker_register ( void  )
static

Definition at line 328 of file method_worker.c.

329{
330 MyIoWorkerId = -1;
331
332 /*
333 * XXX: This could do with more fine-grained locking. But it's also not
334 * very common for the number of workers to change at the moment...
335 */
336 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
337
338 for (int i = 0; i < MAX_IO_WORKERS; ++i)
339 {
341 {
344 MyIoWorkerId = i;
345 break;
346 }
347 else
349 }
350
351 if (MyIoWorkerId == -1)
352 elog(ERROR, "couldn't find a free worker slot");
353
356 LWLockRelease(AioWorkerSubmissionQueueLock);
357
359}
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:226
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
static void pgaio_worker_die(int code, Datum arg)

References Assert(), elog, ERROR, i, AioWorkerControl::idle_worker_mask, AioWorkerSlot::in_use, io_worker_control, AioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAX_IO_WORKERS, MyIoWorkerId, MyLatch, on_shmem_exit(), pgaio_worker_die(), and AioWorkerControl::workers.

Referenced by IoWorkerMain().

◆ pgaio_worker_shmem_init()

static void pgaio_worker_shmem_init ( bool  first_time)
static

Definition at line 132 of file method_worker.c.

133{
134 bool found;
135 int queue_size;
136
138 ShmemInitStruct("AioWorkerSubmissionQueue",
140 &found);
141 if (!found)
142 {
143 io_worker_submission_queue->size = queue_size;
146 }
147
149 ShmemInitStruct("AioWorkerControl",
151 &found);
152 if (!found)
153 {
155 for (int i = 0; i < MAX_IO_WORKERS; ++i)
156 {
159 }
160 }
161}
static size_t pgaio_worker_control_shmem_size(void)
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static AioWorkerSubmissionQueue * io_worker_submission_queue
Definition: method_worker.c:98
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:387

References AioWorkerSubmissionQueue::head, i, AioWorkerControl::idle_worker_mask, AioWorkerSlot::in_use, io_worker_control, io_worker_submission_queue, AioWorkerSlot::latch, MAX_IO_WORKERS, pgaio_worker_control_shmem_size(), pgaio_worker_queue_shmem_size(), ShmemInitStruct(), AioWorkerSubmissionQueue::size, AioWorkerSubmissionQueue::tail, and AioWorkerControl::workers.

◆ pgaio_worker_shmem_size()

static size_t pgaio_worker_shmem_size ( void  )
static

Definition at line 120 of file method_worker.c.

121{
122 size_t sz;
123 int queue_size;
124
125 sz = pgaio_worker_queue_shmem_size(&queue_size);
127
128 return sz;
129}
Size add_size(Size s1, Size s2)
Definition: shmem.c:493

References add_size(), pgaio_worker_control_shmem_size(), and pgaio_worker_queue_shmem_size().

◆ pgaio_worker_submission_queue_consume()

static uint32 pgaio_worker_submission_queue_consume ( void  )
static

Definition at line 200 of file method_worker.c.

201{
203 uint32 result;
204
206 if (queue->tail == queue->head)
207 return UINT32_MAX; /* empty */
208
209 result = queue->ios[queue->tail];
210 queue->tail = (queue->tail + 1) & (queue->size - 1);
211
212 return result;
213}
uint32 ios[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:60

References AioWorkerSubmissionQueue::head, io_worker_submission_queue, AioWorkerSubmissionQueue::ios, AioWorkerSubmissionQueue::size, and AioWorkerSubmissionQueue::tail.

Referenced by IoWorkerMain().

◆ pgaio_worker_submission_queue_depth()

static uint32 pgaio_worker_submission_queue_depth ( void  )
static

Definition at line 216 of file method_worker.c.

217{
218 uint32 head;
219 uint32 tail;
220
223
224 if (tail > head)
226
227 Assert(head >= tail);
228
229 return head - tail;
230}

References Assert(), AioWorkerSubmissionQueue::head, io_worker_submission_queue, AioWorkerSubmissionQueue::size, and AioWorkerSubmissionQueue::tail.

Referenced by IoWorkerMain().

◆ pgaio_worker_submission_queue_insert()

static bool pgaio_worker_submission_queue_insert ( PgAioHandle ioh)
static

Definition at line 179 of file method_worker.c.

180{
182 uint32 new_head;
183
185 new_head = (queue->head + 1) & (queue->size - 1);
186 if (new_head == queue->tail)
187 {
188 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
190 return false; /* full */
191 }
192
193 queue->ios[queue->head] = pgaio_io_get_id(ioh);
194 queue->head = new_head;
195
196 return true;
197}
int pgaio_io_get_id(PgAioHandle *ioh)
Definition: aio.c:330
#define pgaio_debug(elevel, msg,...)
Definition: aio_internal.h:376
#define DEBUG3
Definition: elog.h:28

References DEBUG3, AioWorkerSubmissionQueue::head, io_worker_submission_queue, AioWorkerSubmissionQueue::ios, pgaio_debug, pgaio_io_get_id(), AioWorkerSubmissionQueue::size, and AioWorkerSubmissionQueue::tail.

Referenced by pgaio_worker_submit_internal().

◆ pgaio_worker_submit()

static int pgaio_worker_submit ( uint16  num_staged_ios,
PgAioHandle **  staged_ios 
)
static

Definition at line 293 of file method_worker.c.

294{
295 for (int i = 0; i < num_staged_ios; i++)
296 {
297 PgAioHandle *ioh = staged_ios[i];
298
300 }
301
302 pgaio_worker_submit_internal(num_staged_ios, staged_ios);
303
304 return num_staged_ios;
305}
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition: aio.c:482
static void pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])

References i, pgaio_io_prepare_submit(), and pgaio_worker_submit_internal().

◆ pgaio_worker_submit_internal()

static void pgaio_worker_submit_internal ( int  nios,
PgAioHandle ios[] 
)
static

Definition at line 242 of file method_worker.c.

243{
244 PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
245 int nsync = 0;
246 Latch *wakeup = NULL;
247 int worker;
248
250
251 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
252 for (int i = 0; i < nios; ++i)
253 {
256 {
257 /*
258 * We'll do it synchronously, but only after we've sent as many as
259 * we can to workers, to maximize concurrency.
260 */
261 synchronous_ios[nsync++] = ios[i];
262 continue;
263 }
264
265 if (wakeup == NULL)
266 {
267 /* Choose an idle worker to wake up if we haven't already. */
268 worker = pgaio_choose_idle_worker();
269 if (worker >= 0)
271
272 pgaio_debug_io(DEBUG4, ios[i],
273 "choosing worker %d",
274 worker);
275 }
276 }
277 LWLockRelease(AioWorkerSubmissionQueueLock);
278
279 if (wakeup)
281
282 /* Run whatever is left synchronously. */
283 if (nsync > 0)
284 {
285 for (int i = 0; i < nsync; ++i)
286 {
287 pgaio_io_perform_synchronously(synchronous_ios[i]);
288 }
289 }
290}
#define PGAIO_SUBMIT_BATCH_SIZE
Definition: aio_internal.h:28
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:130

References Assert(), DEBUG4, i, io_worker_control, AioWorkerSlot::latch, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), pgaio_choose_idle_worker(), pgaio_debug_io, pgaio_io_perform_synchronously(), PGAIO_SUBMIT_BATCH_SIZE, pgaio_worker_needs_synchronous_execution(), pgaio_worker_submission_queue_insert(), SetLatch(), wakeup, and AioWorkerControl::workers.

Referenced by pgaio_worker_submit().

◆ pgaio_workers_enabled()

bool pgaio_workers_enabled ( void  )

Definition at line 577 of file method_worker.c.

578{
579 return io_method == IOMETHOD_WORKER;
580}
int io_method
Definition: aio.c:77
@ IOMETHOD_WORKER
Definition: aio.h:35

References io_method, and IOMETHOD_WORKER.

Referenced by maybe_adjust_io_workers().

Variable Documentation

◆ io_worker_control

◆ io_worker_queue_size

int io_worker_queue_size = 64
static

Definition at line 96 of file method_worker.c.

Referenced by pgaio_worker_queue_shmem_size().

◆ io_worker_submission_queue

◆ io_workers

int io_workers = 3

Definition at line 93 of file method_worker.c.

Referenced by maybe_adjust_io_workers().

◆ MyIoWorkerId

int MyIoWorkerId
static

Definition at line 97 of file method_worker.c.

Referenced by IoWorkerMain(), pgaio_worker_die(), and pgaio_worker_register().

◆ pgaio_worker_ops

const IoMethodOps pgaio_worker_ops
Initial value:
= {
.shmem_size = pgaio_worker_shmem_size,
.shmem_init = pgaio_worker_shmem_init,
.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
}
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static size_t pgaio_worker_shmem_size(void)
static void pgaio_worker_shmem_init(bool first_time)

Definition at line 83 of file method_worker.c.