PostgreSQL Source Code git master
Loading...
Searching...
No Matches
io_worker.h File Reference
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

pg_noreturn void IoWorkerMain (const void *startup_data, size_t startup_data_len)
 
bool pgaio_worker_pm_test_grow_signal_sent (void)
 
void pgaio_worker_pm_clear_grow_signal_sent (void)
 
bool pgaio_worker_pm_test_grow (void)
 

Variables

PGDLLIMPORT int io_min_workers
 
PGDLLIMPORT int io_max_workers
 
PGDLLIMPORT int io_worker_idle_timeout
 
PGDLLIMPORT int io_worker_launch_interval
 

Function Documentation

◆ IoWorkerMain()

pg_noreturn void IoWorkerMain ( const void startup_data,
size_t  startup_data_len 
)
extern

Definition at line 666 of file method_worker.c.

667{
670 int timeout_guc_used = 0;
671 PgAioHandle *volatile error_ioh = NULL;
672 ErrorContextCallback errcallback = {0};
673 volatile int error_errno = 0;
674 char cmd[128];
675 int hist_ios = 0;
676 int hist_wakeups = 0;
677
679
681 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
682
683 /*
684 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
685 * shutdown sequence, similar to checkpointer.
686 */
688 /* SIGQUIT handler was already set up by InitPostmasterChild */
693
694 /* also registers a shutdown callback to unregister */
696
697 sprintf(cmd, "%d", MyIoWorkerId);
698 set_ps_display(cmd);
699
701 errcallback.previous = error_context_stack;
702 error_context_stack = &errcallback;
703
704 /* see PostgresMain() */
705 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
706 {
709
711
712 /*
713 * In the - very unlikely - case that the IO failed in a way that
714 * raises an error we need to mark the IO as failed.
715 *
716 * Need to do just enough error recovery so that we can mark the IO as
717 * failed and then exit (postmaster will start a new worker).
718 */
720
721 if (error_ioh != NULL)
722 {
723 /* should never fail without setting error_errno */
724 Assert(error_errno != 0);
725
727
731 }
732
733 proc_exit(1);
734 }
735
736 /* We can now handle ereport(ERROR) */
738
740
742 {
744 int worker = -1;
745 int queue_depth = 0;
746 bool maybe_grow = false;
747
748 /*
749 * Try to get a job to do.
750 *
751 * The lwlock acquisition also provides the necessary memory barrier
752 * to ensure that we don't see an outdated data in the handle.
753 */
756 {
757 /* Nothing to do. Mark self idle. */
760 }
761 else
762 {
763 /* Got one. Clear idle flag. */
766
767 /*
768 * See if we should wake up a higher numbered peer. Only do that
769 * if this worker is not receiving spurious wakeups itself. The
770 * intention is create a frontier beyond which idle workers stay
771 * asleep.
772 *
773 * This heuristic tries to discover the useful wakeup propagation
774 * chain length when IOs are very fast and workers wake up to find
775 * that all IOs have already been taken.
776 *
777 * If we chose not to wake a worker when we ideally should have,
778 * then the ratio will soon change to correct that.
779 */
780 if (hist_wakeups <= hist_ios)
781 {
783 if (queue_depth > 0)
784 {
785 /* Choose a worker higher than me to wake. */
787 if (worker == -1)
788 maybe_grow = true;
789 }
790 }
791 }
793
794 /* Propagate wakeups. */
795 if (worker != -1)
796 {
797 pgaio_worker_wake(worker);
798 }
799 else if (maybe_grow)
800 {
801 /*
802 * We know there was at least one more item in the queue, and we
803 * failed to find a higher-numbered idle worker to wake. Now we
804 * decide if we should try to start one more worker.
805 *
806 * We do this with a simple heuristic: is the queue depth greater
807 * than the current number of workers?
808 *
809 * Consider the following situations:
810 *
811 * 1. The queue depth is constantly increasing, because IOs are
812 * arriving faster than they can possibly be serviced. It doesn't
813 * matter much which threshold we choose, as we will surely hit
814 * it. Crossing the current worker count is a useful signal
815 * because it's clearly too deep to avoid queuing latency already,
816 * but still leaves a small window of opportunity to improve the
817 * situation before the queue oveflows.
818 *
819 * 2. The worker pool is keeping up, no latency is being
820 * introduced and an extra worker would be a waste of resources.
821 * Queue depth distributions tend to be heavily skewed, with long
822 * tails of low probability spikes (due to submission clustering,
823 * scheduling, jitter, stalls, noisy neighbors, etc). We want a
824 * number that is very unlikely to be triggered by an outlier, and
825 * we bet that an exponential or similar distribution whose
826 * outliers never reach this threshold must be almost entirely
827 * concentrated at the low end. If we do see a spike as big as
828 * the worker count, we take it as a signal that the distribution
829 * is surely too wide.
830 *
831 * On its own, this is an extremely crude signal. When combined
832 * with the wakeup propagation test that precedes it (but on its
833 * own tends to overshoot) and io_worker_launch_delay, the result
834 * is that we gradually test each pool size until we find one that
835 * doesn't trigger further expansion, and then hold it for at
836 * least io_worker_idle_timeout.
837 *
838 * XXX Perhaps ideas from queueing theory or control theory could
839 * do a better job of this.
840 */
841
842 /* Read nworkers without lock for this heuristic purpose. */
845 }
846
847 if (io_index != -1)
848 {
850
851 /* Cancel timeout and update wakeup:work ratio. */
854 {
855 hist_wakeups /= 2;
856 hist_ios /= 2;
857 }
858
860 error_ioh = ioh;
861 errcallback.arg = ioh;
862
864 "worker %d processing IO",
866
867 /*
868 * Prevent interrupts between pgaio_io_reopen() and
869 * pgaio_io_perform_synchronously() that otherwise could lead to
870 * the FD getting closed in that window.
871 */
873
874 /*
875 * It's very unlikely, but possible, that reopen fails. E.g. due
876 * to memory allocations failing or file permissions changing or
877 * such. In that case we need to fail the IO.
878 *
879 * There's not really a good errno we can report here.
880 */
883
884 /*
885 * To be able to exercise the reopen-fails path, allow injection
886 * points to trigger a failure at this point.
887 */
888 INJECTION_POINT("aio-worker-after-reopen", ioh);
889
890 error_errno = 0;
891 error_ioh = NULL;
892
893 /*
894 * As part of IO completion the buffer will be marked as NOACCESS,
895 * until the buffer is pinned again - which never happens in io
896 * workers. Therefore the next time there is IO for the same
897 * buffer, the memory will be considered inaccessible. To avoid
898 * that, explicitly allow access to the memory before reading data
899 * into it.
900 */
901#ifdef USE_VALGRIND
902 {
903 struct iovec *iov;
904 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
905
906 for (int i = 0; i < iov_length; i++)
908 }
909#endif
910
911#ifdef PGAIO_WORKER_SHOW_PS_INFO
912 {
914
915 sprintf(cmd, "%d: [%s] %s",
920 set_ps_display(cmd);
921 }
922#endif
923
924 /*
925 * We don't expect this to ever fail with ERROR or FATAL, no need
926 * to keep error_ioh set to the IO.
927 * pgaio_io_perform_synchronously() contains a critical section to
928 * ensure we don't accidentally fail.
929 */
931
933 errcallback.arg = NULL;
934 }
935 else
936 {
937 int timeout_ms;
938
939 /* Cancel new worker request if pending. */
941
942 /* Compute the remaining allowed idle time. */
943 if (io_worker_idle_timeout == -1)
944 {
945 /* Never time out. */
946 timeout_ms = -1;
947 }
948 else
949 {
951
952 /* If the GUC changes, reset timer. */
953 if (idle_timeout_abs != 0 &&
956
957 /* Only the highest-numbered worker can time out. */
959 {
960 if (idle_timeout_abs == 0)
961 {
962 /*
963 * I have just been promoted to the timeout worker, or
964 * the GUC changed. Compute new absolute time from
965 * now.
966 */
971 }
972 timeout_ms =
974 }
975 else
976 {
977 /* No timeout for me. */
979 timeout_ms = -1;
980 }
981 }
982
983#ifdef PGAIO_WORKER_SHOW_PS_INFO
984 sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
986 set_ps_display(cmd);
987#endif
988
992 {
993 /* WL_TIMEOUT */
996 break;
997 }
998 else
999 {
1000 /* WL_LATCH_SET */
1002 {
1003 hist_wakeups /= 2;
1004 hist_ios /= 2;
1005 }
1006 }
1008 }
1009
1011
1013 {
1014 ConfigReloadPending = false;
1016
1017 /* If io_max_workers has been decreased, exit highest first. */
1019 break;
1020 }
1021 }
1022
1023 error_context_stack = errcallback.previous;
1024 proc_exit(0);
1025}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition aio.c:528
PgAioCtl * pgaio_ctl
Definition aio.c:78
#define pgaio_debug_io(elevel, ioh, msg,...)
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition aio_io.c:116
const char * pgaio_io_get_op_name(PgAioHandle *ioh)
Definition aio_io.c:175
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition aio_target.c:116
char * pgaio_io_get_target_description(PgAioHandle *ioh)
Definition aio_target.c:84
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:41
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define Assert(condition)
Definition c.h:943
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
int64 TimestampTz
Definition timestamp.h:39
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
sigjmp_buf * PG_exception_stack
Definition elog.c:101
#define DEBUG4
Definition elog.h:28
struct Latch * MyLatch
Definition globals.c:65
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockReleaseAll(void)
Definition lwlock.c:1866
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pfree(void *pointer)
Definition mcxt.c:1616
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28
#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
static void pgaio_worker_request_grow(void)
static bool pgaio_worker_can_timeout(void)
static void pgaio_worker_cancel_grow(void)
static void pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
static int pgaio_worker_choose_idle(int only_workers_above)
static void pgaio_worker_wake(int worker)
int io_max_workers
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
static int MyIoWorkerId
int io_worker_idle_timeout
static int pgaio_worker_submission_queue_consume(void)
static void pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
#define END_CRIT_SECTION()
Definition miscadmin.h:154
#define die(msg)
#define pqsignal
Definition port.h:547
#define sprintf
Definition port.h:262
static int fb(int x)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
static void set_ps_display(const char *activity)
Definition ps_status.h:40
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
PgAioHandle * io_handles
PgAioWorkerSet idle_workerset
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * description
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171

References ErrorContextCallback::arg, Assert, AuxiliaryProcessMainCommon(), ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG4, description, die, EmitErrorReport(), END_CRIT_SECTION, error_context_stack, fb(), GetCurrentTimestamp(), HOLD_INTERRUPTS, i, PgAioWorkerControl::idle_workerset, INJECTION_POINT, PgAioCtl::io_handles, io_max_workers, io_worker_control, io_worker_idle_timeout, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), MyIoWorkerId, MyLatch, now(), PgAioWorkerControl::nworkers, pfree(), PG_exception_stack, pgaio_ctl, pgaio_debug_io, pgaio_io_get_iovec_length(), pgaio_io_get_op_name(), pgaio_io_get_target_description(), pgaio_io_perform_synchronously(), pgaio_io_process_completion(), pgaio_io_reopen(), pgaio_worker_can_timeout(), pgaio_worker_cancel_grow(), pgaio_worker_choose_idle(), pgaio_worker_error_callback(), pgaio_worker_register(), pgaio_worker_request_grow(), pgaio_worker_submission_queue_consume(), pgaio_worker_submission_queue_depth(), pgaio_worker_wake(), PGAIO_WORKER_WAKEUP_RATIO_SATURATE, pgaio_workerset_insert(), pgaio_workerset_remove(), PGC_SIGHUP, pqsignal, ErrorContextCallback::previous, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), ResetLatch(), RESUME_INTERRUPTS, set_ps_display(), ShutdownRequestPending, SIGALRM, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, sprintf, START_CRIT_SECTION, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, UnBlockSig, VALGRIND_MAKE_MEM_UNDEFINED, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

◆ pgaio_worker_pm_clear_grow_signal_sent()

void pgaio_worker_pm_clear_grow_signal_sent ( void  )
extern

Definition at line 349 of file method_worker.c.

350{
352 return;
353
354 io_worker_control->grow = false;
357}
#define pg_memory_barrier()
Definition atomics.h:141

References PgAioWorkerControl::grow, PgAioWorkerControl::grow_signal_sent, io_worker_control, and pg_memory_barrier.

Referenced by maybe_start_io_workers().

◆ pgaio_worker_pm_test_grow()

bool pgaio_worker_pm_test_grow ( void  )
extern

Definition at line 339 of file method_worker.c.

340{
343}

References PgAioWorkerControl::grow, io_worker_control, and pg_memory_barrier.

Referenced by maybe_start_io_workers().

◆ pgaio_worker_pm_test_grow_signal_sent()

bool pgaio_worker_pm_test_grow_signal_sent ( void  )
extern

Variable Documentation

◆ io_max_workers

PGDLLIMPORT int io_max_workers
extern

◆ io_min_workers

◆ io_worker_idle_timeout

PGDLLIMPORT int io_worker_idle_timeout
extern

Definition at line 133 of file method_worker.c.

Referenced by IoWorkerMain().

◆ io_worker_launch_interval

PGDLLIMPORT int io_worker_launch_interval
extern

Definition at line 134 of file method_worker.c.

Referenced by maybe_start_io_workers().