PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
method_worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * method_worker.c
4 * AIO - perform AIO using worker processes
5 *
6 * IO workers consume IOs from a shared memory submission queue, run
7 * traditional synchronous system calls, and perform the shared completion
8 * handling immediately. Client code submits most requests by pushing IOs
9 * into the submission queue, and waits (if necessary) using condition
10 * variables. Some IOs cannot be performed in another process due to lack of
11 * infrastructure for reopening the file, and must processed synchronously by
12 * the client code when submitted.
13 *
14 * So that the submitter can make just one system call when submitting a batch
15 * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This
16 * could be improved by using futexes instead of latches to wake N waiters.
17 *
18 * This method of AIO is available in all builds on all operating systems, and
19 * is the default.
20 *
21 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
22 * Portions Copyright (c) 1994, Regents of the University of California
23 *
24 * IDENTIFICATION
25 * src/backend/storage/aio/method_worker.c
26 *
27 *-------------------------------------------------------------------------
28 */
29
30#include "postgres.h"
31
32#include "libpq/pqsignal.h"
33#include "miscadmin.h"
34#include "port/pg_bitutils.h"
37#include "storage/aio.h"
39#include "storage/aio_subsys.h"
40#include "storage/io_worker.h"
41#include "storage/ipc.h"
42#include "storage/latch.h"
43#include "storage/proc.h"
44#include "tcop/tcopprot.h"
45#include "utils/ps_status.h"
46#include "utils/wait_event.h"
47
48
49/* How many workers should each worker wake up if needed? */
50#define IO_WORKER_WAKEUP_FANOUT 2
51
52
54{
61
62typedef struct AioWorkerSlot
63{
65 bool in_use;
67
68typedef struct AioWorkerControl
69{
73
74
75static size_t pgaio_worker_shmem_size(void);
76static void pgaio_worker_shmem_init(bool first_time);
77
79static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
80
81
84 .shmem_init = pgaio_worker_shmem_init,
85
86 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
87 .submit = pgaio_worker_submit,
88};
89
90
91/* GUCs */
92int io_workers = 3;
93
94
95static int io_worker_queue_size = 64;
96static int MyIoWorkerId;
99
100
101static size_t
103{
104 /* Round size up to next power of two so we can make a mask. */
106
107 return offsetof(AioWorkerSubmissionQueue, ios) +
108 sizeof(uint32) * *queue_size;
109}
110
111static size_t
113{
114 return offsetof(AioWorkerControl, workers) +
116}
117
118static size_t
120{
121 size_t sz;
122 int queue_size;
123
124 sz = pgaio_worker_queue_shmem_size(&queue_size);
126
127 return sz;
128}
129
130static void
132{
133 bool found;
134 int queue_size;
135
137 ShmemInitStruct("AioWorkerSubmissionQueue",
139 &found);
140 if (!found)
141 {
142 io_worker_submission_queue->size = queue_size;
145 }
146
148 ShmemInitStruct("AioWorkerControl",
150 &found);
151 if (!found)
152 {
154 for (int i = 0; i < MAX_IO_WORKERS; ++i)
155 {
158 }
159 }
160}
161
162static int
164{
165 int worker;
166
168 return -1;
169
170 /* Find the lowest bit position, and clear it. */
172 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
173
174 return worker;
175}
176
177static bool
179{
181 uint32 new_head;
182
184 new_head = (queue->head + 1) & (queue->size - 1);
185 if (new_head == queue->tail)
186 {
187 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
189 return false; /* full */
190 }
191
192 queue->ios[queue->head] = pgaio_io_get_id(ioh);
193 queue->head = new_head;
194
195 return true;
196}
197
198static uint32
200{
202 uint32 result;
203
205 if (queue->tail == queue->head)
206 return UINT32_MAX; /* empty */
207
208 result = queue->ios[queue->tail];
209 queue->tail = (queue->tail + 1) & (queue->size - 1);
210
211 return result;
212}
213
214static uint32
216{
217 uint32 head;
218 uint32 tail;
219
222
223 if (tail > head)
225
226 Assert(head >= tail);
227
228 return head - tail;
229}
230
231static bool
233{
234 return
237 || !pgaio_io_can_reopen(ioh);
238}
239
240static void
242{
243 PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
244 int nsync = 0;
245 Latch *wakeup = NULL;
246 int worker;
247
249
250 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
251 for (int i = 0; i < nios; ++i)
252 {
255 {
256 /*
257 * We'll do it synchronously, but only after we've sent as many as
258 * we can to workers, to maximize concurrency.
259 */
260 synchronous_ios[nsync++] = ios[i];
261 continue;
262 }
263
264 if (wakeup == NULL)
265 {
266 /* Choose an idle worker to wake up if we haven't already. */
267 worker = pgaio_choose_idle_worker();
268 if (worker >= 0)
270
271 pgaio_debug_io(DEBUG4, ios[i],
272 "choosing worker %d",
273 worker);
274 }
275 }
276 LWLockRelease(AioWorkerSubmissionQueueLock);
277
278 if (wakeup)
280
281 /* Run whatever is left synchronously. */
282 if (nsync > 0)
283 {
284 for (int i = 0; i < nsync; ++i)
285 {
286 pgaio_io_perform_synchronously(synchronous_ios[i]);
287 }
288 }
289}
290
291static int
292pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
293{
294 for (int i = 0; i < num_staged_ios; i++)
295 {
296 PgAioHandle *ioh = staged_ios[i];
297
299 }
300
301 pgaio_worker_submit_internal(num_staged_ios, staged_ios);
302
303 return num_staged_ios;
304}
305
306/*
307 * on_shmem_exit() callback that releases the worker's slot in
308 * io_worker_control.
309 */
310static void
312{
313 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
316
319 LWLockRelease(AioWorkerSubmissionQueueLock);
320}
321
322/*
323 * Register the worker in shared memory, assign MyWorkerId and register a
324 * shutdown callback to release registration.
325 */
326static void
328{
329 MyIoWorkerId = -1;
330
331 /*
332 * XXX: This could do with more fine-grained locking. But it's also not
333 * very common for the number of workers to change at the moment...
334 */
335 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
336
337 for (int i = 0; i < MAX_IO_WORKERS; ++i)
338 {
340 {
343 MyIoWorkerId = i;
344 break;
345 }
346 else
348 }
349
350 if (MyIoWorkerId == -1)
351 elog(ERROR, "couldn't find a free worker slot");
352
355 LWLockRelease(AioWorkerSubmissionQueueLock);
356
358}
359
360void
361IoWorkerMain(const void *startup_data, size_t startup_data_len)
362{
363 sigjmp_buf local_sigjmp_buf;
364 PgAioHandle *volatile error_ioh = NULL;
365 volatile int error_errno = 0;
366 char cmd[128];
367
370
372 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
373
374 /*
375 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
376 * shutdown sequence, similar to checkpointer.
377 */
378 pqsignal(SIGTERM, SIG_IGN);
379 /* SIGQUIT handler was already set up by InitPostmasterChild */
380 pqsignal(SIGALRM, SIG_IGN);
381 pqsignal(SIGPIPE, SIG_IGN);
384
385 /* also registers a shutdown callback to unregister */
387
388 sprintf(cmd, "%d", MyIoWorkerId);
389 set_ps_display(cmd);
390
391 /* see PostgresMain() */
392 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
393 {
394 error_context_stack = NULL;
396
398
399 /*
400 * In the - very unlikely - case that the IO failed in a way that
401 * raises an error we need to mark the IO as failed.
402 *
403 * Need to do just enough error recovery so that we can mark the IO as
404 * failed and then exit (postmaster will start a new worker).
405 */
407
408 if (error_ioh != NULL)
409 {
410 /* should never fail without setting error_errno */
411 Assert(error_errno != 0);
412
413 errno = error_errno;
414
416 pgaio_io_process_completion(error_ioh, -error_errno);
418 }
419
420 proc_exit(1);
421 }
422
423 /* We can now handle ereport(ERROR) */
424 PG_exception_stack = &local_sigjmp_buf;
425
426 sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
427
429 {
430 uint32 io_index;
432 int nlatches = 0;
433 int nwakeups = 0;
434 int worker;
435
436 /* Try to get a job to do. */
437 LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
438 if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
439 {
440 /*
441 * Nothing to do. Mark self idle.
442 *
443 * XXX: Invent some kind of back pressure to reduce useless
444 * wakeups?
445 */
447 }
448 else
449 {
450 /* Got one. Clear idle flag. */
451 io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
452
453 /* See if we can wake up some peers. */
456 for (int i = 0; i < nwakeups; ++i)
457 {
458 if ((worker = pgaio_choose_idle_worker()) < 0)
459 break;
460 latches[nlatches++] = io_worker_control->workers[worker].latch;
461 }
462 }
463 LWLockRelease(AioWorkerSubmissionQueueLock);
464
465 for (int i = 0; i < nlatches; ++i)
466 SetLatch(latches[i]);
467
468 if (io_index != UINT32_MAX)
469 {
470 PgAioHandle *ioh = NULL;
471
472 ioh = &pgaio_ctl->io_handles[io_index];
473 error_ioh = ioh;
474
476 "worker %d processing IO",
478
479 /*
480 * Prevent interrupts between pgaio_io_reopen() and
481 * pgaio_io_perform_synchronously() that otherwise could lead to
482 * the FD getting closed in that window.
483 */
485
486 /*
487 * It's very unlikely, but possible, that reopen fails. E.g. due
488 * to memory allocations failing or file permissions changing or
489 * such. In that case we need to fail the IO.
490 *
491 * There's not really a good errno we can report here.
492 */
493 error_errno = ENOENT;
494 pgaio_io_reopen(ioh);
495
496 /*
497 * To be able to exercise the reopen-fails path, allow injection
498 * points to trigger a failure at this point.
499 */
500 pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
501
502 error_errno = 0;
503 error_ioh = NULL;
504
505 /*
506 * We don't expect this to ever fail with ERROR or FATAL, no need
507 * to keep error_ioh set to the IO.
508 * pgaio_io_perform_synchronously() contains a critical section to
509 * ensure we don't accidentally fail.
510 */
512
514 }
515 else
516 {
518 WAIT_EVENT_IO_WORKER_MAIN);
520 }
521
523 }
524
525 proc_exit(0);
526}
527
528bool
530{
531 return io_method == IOMETHOD_WORKER;
532}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition: aio.c:498
int io_method
Definition: aio.c:75
int pgaio_io_get_id(PgAioHandle *ioh)
Definition: aio.c:328
PgAioCtl * pgaio_ctl
Definition: aio.c:79
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition: aio.c:480
@ IOMETHOD_WORKER
Definition: aio.h:35
@ PGAIO_HF_REFERENCES_LOCAL
Definition: aio.h:60
#define pgaio_io_call_inj(ioh, injection_point)
Definition: aio_internal.h:386
#define pgaio_debug(elevel, msg,...)
Definition: aio_internal.h:355
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:368
#define PGAIO_SUBMIT_BATCH_SIZE
Definition: aio_internal.h:28
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition: aio_io.c:116
void pgaio_io_reopen(PgAioHandle *ioh)
Definition: aio_target.c:108
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition: aio_target.c:97
void AuxiliaryProcessMainCommon(void)
Definition: auxprocess.c:39
sigset_t UnBlockSig
Definition: pqsignal.c:22
#define Min(x, y)
Definition: c.h:975
#define FLEXIBLE_ARRAY_MEMBER
Definition: c.h:434
uint64_t uint64
Definition: c.h:503
uint16_t uint16
Definition: c.h:501
uint32_t uint32
Definition: c.h:502
void EmitErrorReport(void)
Definition: elog.c:1687
ErrorContextCallback * error_context_stack
Definition: elog.c:94
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
#define DEBUG3
Definition: elog.h:28
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define DEBUG4
Definition: elog.h:27
bool IsUnderPostmaster
Definition: globals.c:119
struct Latch * MyLatch
Definition: globals.c:62
Assert(PointerIsAligned(start, uint64))
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition: interrupt.c:105
volatile sig_atomic_t ShutdownRequestPending
Definition: interrupt.c:28
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
void proc_exit(int code)
Definition: ipc.c:104
int i
Definition: isn.c:77
void SetLatch(Latch *latch)
Definition: latch.c:288
void ResetLatch(Latch *latch)
Definition: latch.c:372
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1180
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1900
void LWLockReleaseAll(void)
Definition: lwlock.c:1951
@ LW_EXCLUSIVE
Definition: lwlock.h:114
static size_t pgaio_worker_control_shmem_size(void)
static uint32 pgaio_worker_submission_queue_depth(void)
struct AioWorkerControl AioWorkerControl
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_choose_idle_worker(void)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
#define IO_WORKER_WAKEUP_FANOUT
Definition: method_worker.c:50
static size_t pgaio_worker_shmem_size(void)
static size_t pgaio_worker_queue_shmem_size(int *queue_size)
static AioWorkerSubmissionQueue * io_worker_submission_queue
Definition: method_worker.c:97
static int io_worker_queue_size
Definition: method_worker.c:95
static void pgaio_worker_register(void)
static int MyIoWorkerId
Definition: method_worker.c:96
struct AioWorkerSubmissionQueue AioWorkerSubmissionQueue
const IoMethodOps pgaio_worker_ops
Definition: method_worker.c:82
static void pgaio_worker_die(int code, Datum arg)
struct AioWorkerSlot AioWorkerSlot
static uint32 pgaio_worker_submission_queue_consume(void)
static void pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static AioWorkerControl * io_worker_control
Definition: method_worker.c:98
static void pgaio_worker_shmem_init(bool first_time)
int io_workers
Definition: method_worker.c:92
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
@ B_IO_WORKER
Definition: miscadmin.h:363
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
BackendType MyBackendType
Definition: miscinit.c:64
void * arg
static int pg_rightmost_one_pos64(uint64 word)
Definition: pg_bitutils.h:145
static uint32 pg_nextpower2_32(uint32 num)
Definition: pg_bitutils.h:189
#define die(msg)
#define pqsignal
Definition: port.h:521
#define sprintf
Definition: port.h:241
uintptr_t Datum
Definition: postgres.h:69
#define MAX_IO_WORKERS
Definition: proc.h:446
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:673
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
uint64 idle_worker_mask
Definition: method_worker.c:70
AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:71
uint32 ios[FLEXIBLE_ARRAY_MEMBER]
Definition: method_worker.c:59
size_t(* shmem_size)(void)
Definition: aio_internal.h:266
Definition: latch.h:114
PgAioHandle * io_handles
Definition: aio_internal.h:241
#define WL_EXIT_ON_PM_DEATH
Definition: waiteventset.h:39
#define WL_LATCH_SET
Definition: waiteventset.h:34
static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]
Definition: walreceiver.c:129
#define SIGHUP
Definition: win32_port.h:158
#define SIGPIPE
Definition: win32_port.h:163
#define SIGUSR1
Definition: win32_port.h:170
#define SIGALRM
Definition: win32_port.h:164
#define SIGUSR2
Definition: win32_port.h:171