PostgreSQL Source Code git master
Loading...
Searching...
No Matches
method_worker.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * method_worker.c
4 * AIO - perform AIO using worker processes
5 *
6 * IO workers consume IOs from a shared memory submission queue, run
7 * traditional synchronous system calls, and perform the shared completion
8 * handling immediately. Client code submits most requests by pushing IOs
9 * into the submission queue, and waits (if necessary) using condition
10 * variables. Some IOs cannot be performed in another process due to lack of
11 * infrastructure for reopening the file, and must processed synchronously by
12 * the client code when submitted.
13 *
14 * The pool of workers tries to stabilize at a size that can handle recently
15 * seen variation in demand, within the configured limits.
16 *
17 * This method of AIO is available in all builds on all operating systems, and
18 * is the default.
19 *
20 * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
21 * Portions Copyright (c) 1994, Regents of the University of California
22 *
23 * IDENTIFICATION
24 * src/backend/storage/aio/method_worker.c
25 *
26 *-------------------------------------------------------------------------
27 */
28
29#include "postgres.h"
30
31#include <limits.h>
32
33#include "libpq/pqsignal.h"
34#include "miscadmin.h"
35#include "port/pg_bitutils.h"
38#include "storage/aio.h"
40#include "storage/aio_subsys.h"
41#include "storage/io_worker.h"
42#include "storage/ipc.h"
43#include "storage/latch.h"
44#include "storage/lwlock.h"
45#include "storage/pmsignal.h"
46#include "storage/proc.h"
47#include "storage/shmem.h"
48#include "tcop/tcopprot.h"
50#include "utils/memdebug.h"
51#include "utils/ps_status.h"
52#include "utils/wait_event.h"
53
54/*
55 * Saturation for counters used to estimate wakeup:IO ratio.
56 *
57 * We maintain hist_wakeups for wakeups received and hist_ios for IOs
58 * processed by each worker. When either counter reaches this saturation
59 * value, we divide both by two. The result is an exponentially decaying
60 * ratio of wakeups to IOs, with a very short memory.
61 *
62 * If a worker is itself experiencing useless wakeups, it assumes that
63 * higher-numbered workers would experience even more, so it should end the
64 * chain.
65 */
66#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE 4
67
68/* Debugging support: show current IO and wakeups:ios statistics in ps. */
69/* #define PGAIO_WORKER_SHOW_PS_INFO */
70
78
83
84/*
85 * Sets of worker IDs are held in a simple bitmap, accessed through functions
86 * that provide a more readable abstraction. If we wanted to support more
87 * workers than that, the contention on the single queue would surely get too
88 * high, so we might want to consider multiple pools instead of widening this.
89 */
91
92#define PGAIO_WORKERSET_BITS (sizeof(PgAioWorkerSet) * CHAR_BIT)
93
94static_assert(PGAIO_WORKERSET_BITS >= MAX_IO_WORKERS, "too small");
95
96typedef struct PgAioWorkerControl
97{
98 /* Seen by postmaster */
99 bool grow;
101
102 /* Protected by AioWorkerSubmissionQueueLock. */
104
105 /* Protected by AioWorkerControlLock. */
108
109 /* Protected by AioWorkerControlLock. */
112
113
114static void pgaio_worker_shmem_request(void *arg);
115static void pgaio_worker_shmem_init(void *arg);
116
118static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
119
120
123 .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
124
125 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
126 .submit = pgaio_worker_submit,
127};
128
129
130/* GUCs */
135
136
137static int io_worker_queue_size = 64;
138static int MyIoWorkerId = -1;
141
142
143static void
145{
146 *set = 0;
147}
148
149static bool
151{
152 return *set == 0;
153}
154
155static PgAioWorkerSet
157{
158 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
159 return UINT64_C(1) << worker;
160}
161
162static void
167
168static void
173
174static void
176{
177 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
178 *set |= pgaio_workerset_singleton(worker);
179}
180
181static void
183{
184 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
185 *set &= ~pgaio_workerset_singleton(worker);
186}
187
188static void
190{
191 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
192 *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
193}
194
195static int
201
202static int
208
209static int
211{
212 int worker = pgaio_workerset_get_lowest(set);
213
214 pgaio_workerset_remove(set, worker);
215 return worker;
216}
217
218#ifdef USE_ASSERT_CHECKING
219static bool
221{
222 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
223 return (*set & pgaio_workerset_singleton(worker)) != 0;
224}
225
226static int
228{
229 return pg_popcount64(*set);
230}
231#endif
232
233static void
235{
236 size_t size;
237 int queue_size;
238
239 /* Round size up to next power of two so we can make a mask. */
241
242 size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
243 ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
244 .size = size,
245 .ptr = (void **) &io_worker_submission_queue,
246 );
247
248 size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
249 ShmemRequestStruct(.name = "AioWorkerControl",
250 .size = size,
251 .ptr = (void **) &io_worker_control,
252 );
253}
254
255static void
257{
258 int queue_size;
259
260 /* Round size up like in pgaio_worker_shmem_request() */
262
263 io_worker_submission_queue->size = queue_size;
266 io_worker_control->grow = false;
269
270 for (int i = 0; i < MAX_IO_WORKERS; ++i)
272}
273
274/*
275 * Tell postmaster that we think a new worker is needed.
276 */
277static void
279{
280 /*
281 * Suppress useless signaling if we already know that we're at the
282 * maximum. This uses an unlocked read of nworkers, but that's OK for
283 * this heuristic purpose.
284 */
286 return;
287
288 /* Already requested? */
290 return;
291
292 io_worker_control->grow = true;
294
295 /*
296 * If the postmaster has already been signaled, don't do it again until
297 * the postmaster clears this flag. There is no point in repeated signals
298 * if grow is being set and cleared repeatedly while the postmaster is
299 * waiting for io_worker_launch_interval, which it applies even to
300 * canceled requests.
301 */
303 return;
304
308}
309
310/*
311 * Cancel any request for a new worker, after observing an empty queue.
312 */
313static void
315{
317 return;
318
319 io_worker_control->grow = false;
321}
322
323/*
324 * Called by the postmaster to check if a new worker has been requested (but
325 * possibly canceled since).
326 */
327bool
333
334/*
335 * Called by the postmaster to check if a new worker has been requested and
336 * not canceled since.
337 */
338bool
344
345/*
346 * Called by the postmaster to clear the request for a new worker.
347 */
348void
358
359static int
361{
362 PgAioWorkerSet workerset;
363 int worker;
364
366
368 if (only_workers_above >= 0)
370 if (pgaio_workerset_is_empty(&workerset))
371 return -1;
372
373 /* Find the lowest numbered idle worker and mark it not idle. */
374 worker = pgaio_workerset_get_lowest(&workerset);
376
377 return worker;
378}
379
380/*
381 * Try to wake a worker by setting its latch, to tell it there are IOs to
382 * process in the submission queue.
383 */
384static void
386{
387 ProcNumber proc_number;
388
389 /*
390 * If the selected worker is concurrently exiting, then pgaio_worker_die()
391 * had not yet removed it as of when we saw it in idle_workerset. That's
392 * OK, because it will wake all remaining workers to close wakeup-vs-exit
393 * races: *someone* will see the queued IO. If there are no workers
394 * running, the postmaster will start a new one.
395 */
396 proc_number = io_worker_control->workers[worker].proc_number;
397 if (proc_number != INVALID_PROC_NUMBER)
398 SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
399}
400
401/*
402 * Try to wake a set of workers. Used on pool change, to close races
403 * described in the callers.
404 */
405static void
411
412static bool
414{
417
419
421 new_head = (queue->head + 1) & (queue->size - 1);
422 if (new_head == queue->tail)
423 {
424 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
426 return false; /* full */
427 }
428
429 queue->sqes[queue->head] = pgaio_io_get_id(ioh);
430 queue->head = new_head;
431
432 return true;
433}
434
435static int
437{
439 int result;
440
442
444 if (queue->tail == queue->head)
445 return -1; /* empty */
446
447 result = queue->sqes[queue->tail];
448 queue->tail = (queue->tail + 1) & (queue->size - 1);
449
450 return result;
451}
452
453static uint32
455{
456 uint32 head;
457 uint32 tail;
458
460
463
464 if (tail > head)
466
467 Assert(head >= tail);
468
469 return head - tail;
470}
471
472static bool
480
481static int
482pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
483{
485 int nsync = 0;
486 int worker = -1;
487
488 Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
489
490 for (int i = 0; i < num_staged_ios; i++)
491 pgaio_io_prepare_submit(staged_ios[i]);
492
494 {
495 for (int i = 0; i < num_staged_ios; ++i)
496 {
498 if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
499 {
500 /*
501 * Do the rest synchronously. If the queue is full, give up
502 * and do the rest synchronously. We're holding an exclusive
503 * lock on the queue so nothing can consume entries.
504 */
505 synchronous_ios = &staged_ios[i];
506 nsync = (num_staged_ios - i);
507
508 break;
509 }
510 }
511 /* Choose one worker to wake for this batch. */
512 worker = pgaio_worker_choose_idle(-1);
514
515 /* Wake up chosen worker. It will wake peers if necessary. */
516 if (worker != -1)
517 pgaio_worker_wake(worker);
518 }
519 else
520 {
521 /* do everything synchronously, no wakeup needed */
522 synchronous_ios = staged_ios;
523 nsync = num_staged_ios;
524 }
525
526 /* Run whatever is left synchronously. */
527 if (nsync > 0)
528 {
529 for (int i = 0; i < nsync; ++i)
530 {
532 }
533 }
534
535 return num_staged_ios;
536}
537
538/*
539 * on_shmem_exit() callback that releases the worker's slot in
540 * io_worker_control.
541 */
542static void
570
571/*
572 * Register the worker in shared memory, assign MyIoWorkerId and register a
573 * shutdown callback to release registration.
574 */
575static void
613
614static void
616{
617 ProcNumber owner;
619 int32 owner_pid;
621
622 if (!ioh)
623 return;
624
625 Assert(ioh->owner_procno != MyProcNumber);
627
628 owner = ioh->owner_procno;
630 owner_pid = owner_proc->pid;
631
632 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
633}
634
635/*
636 * Check if this backend is allowed to time out, and thus should use a
637 * non-infinite sleep time. Only the highest-numbered worker is allowed to
638 * time out, and only if the pool is above io_min_workers. Serializing
639 * timeouts keeps IDs in a range 0..N without gaps, and avoids undershooting
640 * io_min_workers.
641 *
642 * The result is only instantaneously true and may be temporarily inconsistent
643 * in different workers around transitions, but all workers are woken up on
644 * pool size or GUC changes making the result eventually consistent.
645 */
646static bool
648{
649 PgAioWorkerSet workerset;
650
652 return false;
653
654 /* Serialize against pool size changes. */
656 workerset = io_worker_control->workerset;
658
659 if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
660 return false;
661
662 return true;
663}
664
665void
667{
670 int timeout_guc_used = 0;
671 PgAioHandle *volatile error_ioh = NULL;
672 ErrorContextCallback errcallback = {0};
673 volatile int error_errno = 0;
674 char cmd[128];
675 int hist_ios = 0;
676 int hist_wakeups = 0;
677
679
681 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
682
683 /*
684 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
685 * shutdown sequence, similar to checkpointer.
686 */
688 /* SIGQUIT handler was already set up by InitPostmasterChild */
693
694 /* also registers a shutdown callback to unregister */
696
697 sprintf(cmd, "%d", MyIoWorkerId);
698 set_ps_display(cmd);
699
701 errcallback.previous = error_context_stack;
702 error_context_stack = &errcallback;
703
704 /* see PostgresMain() */
705 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
706 {
709
711
712 /*
713 * In the - very unlikely - case that the IO failed in a way that
714 * raises an error we need to mark the IO as failed.
715 *
716 * Need to do just enough error recovery so that we can mark the IO as
717 * failed and then exit (postmaster will start a new worker).
718 */
720
721 if (error_ioh != NULL)
722 {
723 /* should never fail without setting error_errno */
724 Assert(error_errno != 0);
725
727
731 }
732
733 proc_exit(1);
734 }
735
736 /* We can now handle ereport(ERROR) */
738
740
742 {
744 int worker = -1;
745 int queue_depth = 0;
746 bool maybe_grow = false;
747
748 /*
749 * Try to get a job to do.
750 *
751 * The lwlock acquisition also provides the necessary memory barrier
752 * to ensure that we don't see an outdated data in the handle.
753 */
756 {
757 /* Nothing to do. Mark self idle. */
760 }
761 else
762 {
763 /* Got one. Clear idle flag. */
766
767 /*
768 * See if we should wake up a higher numbered peer. Only do that
769 * if this worker is not receiving spurious wakeups itself. The
770 * intention is create a frontier beyond which idle workers stay
771 * asleep.
772 *
773 * This heuristic tries to discover the useful wakeup propagation
774 * chain length when IOs are very fast and workers wake up to find
775 * that all IOs have already been taken.
776 *
777 * If we chose not to wake a worker when we ideally should have,
778 * then the ratio will soon change to correct that.
779 */
780 if (hist_wakeups <= hist_ios)
781 {
783 if (queue_depth > 0)
784 {
785 /* Choose a worker higher than me to wake. */
787 if (worker == -1)
788 maybe_grow = true;
789 }
790 }
791 }
793
794 /* Propagate wakeups. */
795 if (worker != -1)
796 {
797 pgaio_worker_wake(worker);
798 }
799 else if (maybe_grow)
800 {
801 /*
802 * We know there was at least one more item in the queue, and we
803 * failed to find a higher-numbered idle worker to wake. Now we
804 * decide if we should try to start one more worker.
805 *
806 * We do this with a simple heuristic: is the queue depth greater
807 * than the current number of workers?
808 *
809 * Consider the following situations:
810 *
811 * 1. The queue depth is constantly increasing, because IOs are
812 * arriving faster than they can possibly be serviced. It doesn't
813 * matter much which threshold we choose, as we will surely hit
814 * it. Crossing the current worker count is a useful signal
815 * because it's clearly too deep to avoid queuing latency already,
816 * but still leaves a small window of opportunity to improve the
817 * situation before the queue oveflows.
818 *
819 * 2. The worker pool is keeping up, no latency is being
820 * introduced and an extra worker would be a waste of resources.
821 * Queue depth distributions tend to be heavily skewed, with long
822 * tails of low probability spikes (due to submission clustering,
823 * scheduling, jitter, stalls, noisy neighbors, etc). We want a
824 * number that is very unlikely to be triggered by an outlier, and
825 * we bet that an exponential or similar distribution whose
826 * outliers never reach this threshold must be almost entirely
827 * concentrated at the low end. If we do see a spike as big as
828 * the worker count, we take it as a signal that the distribution
829 * is surely too wide.
830 *
831 * On its own, this is an extremely crude signal. When combined
832 * with the wakeup propagation test that precedes it (but on its
833 * own tends to overshoot) and io_worker_launch_delay, the result
834 * is that we gradually test each pool size until we find one that
835 * doesn't trigger further expansion, and then hold it for at
836 * least io_worker_idle_timeout.
837 *
838 * XXX Perhaps ideas from queueing theory or control theory could
839 * do a better job of this.
840 */
841
842 /* Read nworkers without lock for this heuristic purpose. */
845 }
846
847 if (io_index != -1)
848 {
850
851 /* Cancel timeout and update wakeup:work ratio. */
854 {
855 hist_wakeups /= 2;
856 hist_ios /= 2;
857 }
858
860 error_ioh = ioh;
861 errcallback.arg = ioh;
862
864 "worker %d processing IO",
866
867 /*
868 * Prevent interrupts between pgaio_io_reopen() and
869 * pgaio_io_perform_synchronously() that otherwise could lead to
870 * the FD getting closed in that window.
871 */
873
874 /*
875 * It's very unlikely, but possible, that reopen fails. E.g. due
876 * to memory allocations failing or file permissions changing or
877 * such. In that case we need to fail the IO.
878 *
879 * There's not really a good errno we can report here.
880 */
883
884 /*
885 * To be able to exercise the reopen-fails path, allow injection
886 * points to trigger a failure at this point.
887 */
888 INJECTION_POINT("aio-worker-after-reopen", ioh);
889
890 error_errno = 0;
891 error_ioh = NULL;
892
893 /*
894 * As part of IO completion the buffer will be marked as NOACCESS,
895 * until the buffer is pinned again - which never happens in io
896 * workers. Therefore the next time there is IO for the same
897 * buffer, the memory will be considered inaccessible. To avoid
898 * that, explicitly allow access to the memory before reading data
899 * into it.
900 */
901#ifdef USE_VALGRIND
902 {
903 struct iovec *iov;
904 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
905
906 for (int i = 0; i < iov_length; i++)
908 }
909#endif
910
911#ifdef PGAIO_WORKER_SHOW_PS_INFO
912 {
914
915 sprintf(cmd, "%d: [%s] %s",
920 set_ps_display(cmd);
921 }
922#endif
923
924 /*
925 * We don't expect this to ever fail with ERROR or FATAL, no need
926 * to keep error_ioh set to the IO.
927 * pgaio_io_perform_synchronously() contains a critical section to
928 * ensure we don't accidentally fail.
929 */
931
933 errcallback.arg = NULL;
934 }
935 else
936 {
937 int timeout_ms;
938
939 /* Cancel new worker request if pending. */
941
942 /* Compute the remaining allowed idle time. */
943 if (io_worker_idle_timeout == -1)
944 {
945 /* Never time out. */
946 timeout_ms = -1;
947 }
948 else
949 {
951
952 /* If the GUC changes, reset timer. */
953 if (idle_timeout_abs != 0 &&
956
957 /* Only the highest-numbered worker can time out. */
959 {
960 if (idle_timeout_abs == 0)
961 {
962 /*
963 * I have just been promoted to the timeout worker, or
964 * the GUC changed. Compute new absolute time from
965 * now.
966 */
971 }
972 timeout_ms =
974 }
975 else
976 {
977 /* No timeout for me. */
979 timeout_ms = -1;
980 }
981 }
982
983#ifdef PGAIO_WORKER_SHOW_PS_INFO
984 sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
986 set_ps_display(cmd);
987#endif
988
992 {
993 /* WL_TIMEOUT */
996 break;
997 }
998 else
999 {
1000 /* WL_LATCH_SET */
1002 {
1003 hist_wakeups /= 2;
1004 hist_ios /= 2;
1005 }
1006 }
1008 }
1009
1011
1013 {
1014 ConfigReloadPending = false;
1016
1017 /* If io_max_workers has been decreased, exit highest first. */
1019 break;
1020 }
1021 }
1022
1023 error_context_stack = errcallback.previous;
1024 proc_exit(0);
1025}
1026
1027bool
1029{
1030 return io_method == IOMETHOD_WORKER;
1031}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition aio.c:528
int io_method
Definition aio.c:74
int pgaio_io_get_id(PgAioHandle *ioh)
Definition aio.c:342
PgAioCtl * pgaio_ctl
Definition aio.c:78
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition aio.c:510
@ IOMETHOD_WORKER
Definition aio.h:35
@ PGAIO_HF_REFERENCES_LOCAL
Definition aio.h:60
#define pgaio_debug(elevel, msg,...)
#define pgaio_debug_io(elevel, ioh, msg,...)
#define PGAIO_SUBMIT_BATCH_SIZE
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition aio_io.c:116
const char * pgaio_io_get_op_name(PgAioHandle *ioh)
Definition aio_io.c:175
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition aio_target.c:116
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition aio_target.c:103
char * pgaio_io_get_target_description(PgAioHandle *ioh)
Definition aio_target.c:84
#define pg_memory_barrier()
Definition atomics.h:141
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:41
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define Assert(condition)
Definition c.h:943
#define FLEXIBLE_ARRAY_MEMBER
Definition c.h:558
int32_t int32
Definition c.h:620
uint64_t uint64
Definition c.h:625
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
uint32 result
int64 TimestampTz
Definition timestamp.h:39
Datum arg
Definition elog.c:1322
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
sigjmp_buf * PG_exception_stack
Definition elog.c:101
#define errcontext
Definition elog.h:200
#define DEBUG3
Definition elog.h:29
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define DEBUG4
Definition elog.h:28
ProcNumber MyProcNumber
Definition globals.c:92
bool IsUnderPostmaster
Definition globals.c:122
struct Latch * MyLatch
Definition globals.c:65
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockReleaseAll(void)
Definition lwlock.c:1866
bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1321
@ LW_SHARED
Definition lwlock.h:105
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pfree(void *pointer)
Definition mcxt.c:1616
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28
bool pgaio_worker_pm_test_grow_signal_sent(void)
static int pgaio_workerset_get_lowest(PgAioWorkerSet *set)
#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE
#define PGAIO_WORKERSET_BITS
static void pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
static uint32 pgaio_worker_submission_queue_depth(void)
int io_worker_launch_interval
static void pgaio_workerset_all(PgAioWorkerSet *set)
static void pgaio_worker_error_callback(void *arg)
static void pgaio_worker_request_grow(void)
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static bool pgaio_worker_can_timeout(void)
bool pgaio_worker_pm_test_grow(void)
static void pgaio_worker_shmem_request(void *arg)
static void pgaio_worker_cancel_grow(void)
static void pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
static int pgaio_worker_choose_idle(int only_workers_above)
int io_min_workers
static int pgaio_workerset_get_highest(PgAioWorkerSet *set)
static int pgaio_workerset_pop_lowest(PgAioWorkerSet *set)
uint64 PgAioWorkerSet
static int io_worker_queue_size
static void pgaio_worker_wake(int worker)
int io_max_workers
static void pgaio_worker_register(void)
static bool pgaio_workerset_is_empty(PgAioWorkerSet *set)
static PgAioWorkerControl * io_worker_control
static void pgaio_workerset_wake(PgAioWorkerSet workerset)
static int MyIoWorkerId
int io_worker_idle_timeout
const IoMethodOps pgaio_worker_ops
static void pgaio_worker_die(int code, Datum arg)
static void pgaio_worker_shmem_init(void *arg)
static int pgaio_worker_submission_queue_consume(void)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
bool pgaio_workers_enabled(void)
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
static void pgaio_workerset_initialize(PgAioWorkerSet *set)
void IoWorkerMain(const void *startup_data, size_t startup_data_len)
static PgAioWorkerSet pgaio_workerset_singleton(int worker)
static void pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
void pgaio_worker_pm_clear_grow_signal_sent(void)
static void pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
@ B_IO_WORKER
Definition miscadmin.h:376
#define END_CRIT_SECTION()
Definition miscadmin.h:154
BackendType MyBackendType
Definition miscinit.c:65
static int pg_rightmost_one_pos64(uint64 word)
static uint32 pg_nextpower2_32(uint32 num)
static int pg_popcount64(uint64 word)
static int pg_leftmost_one_pos64(uint64 word)
Definition pg_bitutils.h:72
#define die(msg)
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:164
@ PMSIGNAL_IO_WORKER_GROW
Definition pmsignal.h:41
#define pqsignal
Definition port.h:547
#define sprintf
Definition port.h:262
uint64_t Datum
Definition postgres.h:70
static int fb(int x)
#define MAX_IO_WORKERS
Definition proc.h:526
#define GetPGProcByNumber(n)
Definition proc.h:504
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
static void set_ps_display(const char *activity)
Definition ps_status.h:40
#define ShmemRequestStruct(...)
Definition shmem.h:176
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
ShmemCallbacks shmem_callbacks
Definition proc.h:179
PgAioHandle * io_handles
PgAioWorkerSet idle_workerset
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
PgAioWorkerSet workerset
ProcNumber proc_number
int sqes[FLEXIBLE_ARRAY_MEMBER]
ShmemRequestCallback request_fn
Definition shmem.h:133
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * description
const char * name
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171