PostgreSQL Source Code git master
Loading...
Searching...
No Matches
method_worker.c File Reference
#include "postgres.h"
#include <limits.h>
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "port/pg_bitutils.h"
#include "postmaster/auxprocess.h"
#include "postmaster/interrupt.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/aio_subsys.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "tcop/tcopprot.h"
#include "utils/injection_point.h"
#include "utils/memdebug.h"
#include "utils/ps_status.h"
#include "utils/wait_event.h"
Include dependency graph for method_worker.c:

Go to the source code of this file.

Data Structures

struct  PgAioWorkerSubmissionQueue
 
struct  PgAioWorkerSlot
 
struct  PgAioWorkerControl
 

Macros

#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE   4
 
#define PGAIO_WORKERSET_BITS   (sizeof(PgAioWorkerSet) * CHAR_BIT)
 

Typedefs

typedef struct PgAioWorkerSubmissionQueue PgAioWorkerSubmissionQueue
 
typedef struct PgAioWorkerSlot PgAioWorkerSlot
 
typedef uint64 PgAioWorkerSet
 
typedef struct PgAioWorkerControl PgAioWorkerControl
 

Functions

static void pgaio_worker_shmem_request (void *arg)
 
static void pgaio_worker_shmem_init (void *arg)
 
static bool pgaio_worker_needs_synchronous_execution (PgAioHandle *ioh)
 
static int pgaio_worker_submit (uint16 num_staged_ios, PgAioHandle **staged_ios)
 
static void pgaio_workerset_initialize (PgAioWorkerSet *set)
 
static bool pgaio_workerset_is_empty (PgAioWorkerSet *set)
 
static PgAioWorkerSet pgaio_workerset_singleton (int worker)
 
static void pgaio_workerset_all (PgAioWorkerSet *set)
 
static void pgaio_workerset_subtract (PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
 
static void pgaio_workerset_insert (PgAioWorkerSet *set, int worker)
 
static void pgaio_workerset_remove (PgAioWorkerSet *set, int worker)
 
static void pgaio_workerset_remove_lte (PgAioWorkerSet *set, int worker)
 
static int pgaio_workerset_get_highest (PgAioWorkerSet *set)
 
static int pgaio_workerset_get_lowest (PgAioWorkerSet *set)
 
static int pgaio_workerset_pop_lowest (PgAioWorkerSet *set)
 
static void pgaio_worker_request_grow (void)
 
static void pgaio_worker_cancel_grow (void)
 
bool pgaio_worker_pm_test_grow_signal_sent (void)
 
bool pgaio_worker_pm_test_grow (void)
 
void pgaio_worker_pm_clear_grow_signal_sent (void)
 
static int pgaio_worker_choose_idle (int only_workers_above)
 
static void pgaio_worker_wake (int worker)
 
static void pgaio_workerset_wake (PgAioWorkerSet workerset)
 
static bool pgaio_worker_submission_queue_insert (PgAioHandle *ioh)
 
static int pgaio_worker_submission_queue_consume (void)
 
static uint32 pgaio_worker_submission_queue_depth (void)
 
static void pgaio_worker_die (int code, Datum arg)
 
static void pgaio_worker_register (void)
 
static void pgaio_worker_error_callback (void *arg)
 
static bool pgaio_worker_can_timeout (void)
 
void IoWorkerMain (const void *startup_data, size_t startup_data_len)
 
bool pgaio_workers_enabled (void)
 

Variables

const IoMethodOps pgaio_worker_ops
 
int io_min_workers = 2
 
int io_max_workers = 8
 
int io_worker_idle_timeout = 60000
 
int io_worker_launch_interval = 100
 
static int io_worker_queue_size = 64
 
static int MyIoWorkerId = -1
 
static PgAioWorkerSubmissionQueueio_worker_submission_queue
 
static PgAioWorkerControlio_worker_control
 

Macro Definition Documentation

◆ PGAIO_WORKER_WAKEUP_RATIO_SATURATE

#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE   4

Definition at line 66 of file method_worker.c.

◆ PGAIO_WORKERSET_BITS

#define PGAIO_WORKERSET_BITS   (sizeof(PgAioWorkerSet) * CHAR_BIT)

Definition at line 92 of file method_worker.c.

Typedef Documentation

◆ PgAioWorkerControl

◆ PgAioWorkerSet

Definition at line 90 of file method_worker.c.

◆ PgAioWorkerSlot

◆ PgAioWorkerSubmissionQueue

Function Documentation

◆ IoWorkerMain()

void IoWorkerMain ( const void startup_data,
size_t  startup_data_len 
)

Definition at line 666 of file method_worker.c.

667{
670 int timeout_guc_used = 0;
671 PgAioHandle *volatile error_ioh = NULL;
672 ErrorContextCallback errcallback = {0};
673 volatile int error_errno = 0;
674 char cmd[128];
675 int hist_ios = 0;
676 int hist_wakeups = 0;
677
679
681 pqsignal(SIGINT, die); /* to allow manually triggering worker restart */
682
683 /*
684 * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the
685 * shutdown sequence, similar to checkpointer.
686 */
688 /* SIGQUIT handler was already set up by InitPostmasterChild */
693
694 /* also registers a shutdown callback to unregister */
696
697 sprintf(cmd, "%d", MyIoWorkerId);
698 set_ps_display(cmd);
699
701 errcallback.previous = error_context_stack;
702 error_context_stack = &errcallback;
703
704 /* see PostgresMain() */
705 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
706 {
709
711
712 /*
713 * In the - very unlikely - case that the IO failed in a way that
714 * raises an error we need to mark the IO as failed.
715 *
716 * Need to do just enough error recovery so that we can mark the IO as
717 * failed and then exit (postmaster will start a new worker).
718 */
720
721 if (error_ioh != NULL)
722 {
723 /* should never fail without setting error_errno */
724 Assert(error_errno != 0);
725
727
731 }
732
733 proc_exit(1);
734 }
735
736 /* We can now handle ereport(ERROR) */
738
740
742 {
744 int worker = -1;
745 int queue_depth = 0;
746 bool maybe_grow = false;
747
748 /*
749 * Try to get a job to do.
750 *
751 * The lwlock acquisition also provides the necessary memory barrier
752 * to ensure that we don't see an outdated data in the handle.
753 */
756 {
757 /* Nothing to do. Mark self idle. */
760 }
761 else
762 {
763 /* Got one. Clear idle flag. */
766
767 /*
768 * See if we should wake up a higher numbered peer. Only do that
769 * if this worker is not receiving spurious wakeups itself. The
770 * intention is create a frontier beyond which idle workers stay
771 * asleep.
772 *
773 * This heuristic tries to discover the useful wakeup propagation
774 * chain length when IOs are very fast and workers wake up to find
775 * that all IOs have already been taken.
776 *
777 * If we chose not to wake a worker when we ideally should have,
778 * then the ratio will soon change to correct that.
779 */
780 if (hist_wakeups <= hist_ios)
781 {
783 if (queue_depth > 0)
784 {
785 /* Choose a worker higher than me to wake. */
787 if (worker == -1)
788 maybe_grow = true;
789 }
790 }
791 }
793
794 /* Propagate wakeups. */
795 if (worker != -1)
796 {
797 pgaio_worker_wake(worker);
798 }
799 else if (maybe_grow)
800 {
801 /*
802 * We know there was at least one more item in the queue, and we
803 * failed to find a higher-numbered idle worker to wake. Now we
804 * decide if we should try to start one more worker.
805 *
806 * We do this with a simple heuristic: is the queue depth greater
807 * than the current number of workers?
808 *
809 * Consider the following situations:
810 *
811 * 1. The queue depth is constantly increasing, because IOs are
812 * arriving faster than they can possibly be serviced. It doesn't
813 * matter much which threshold we choose, as we will surely hit
814 * it. Crossing the current worker count is a useful signal
815 * because it's clearly too deep to avoid queuing latency already,
816 * but still leaves a small window of opportunity to improve the
817 * situation before the queue oveflows.
818 *
819 * 2. The worker pool is keeping up, no latency is being
820 * introduced and an extra worker would be a waste of resources.
821 * Queue depth distributions tend to be heavily skewed, with long
822 * tails of low probability spikes (due to submission clustering,
823 * scheduling, jitter, stalls, noisy neighbors, etc). We want a
824 * number that is very unlikely to be triggered by an outlier, and
825 * we bet that an exponential or similar distribution whose
826 * outliers never reach this threshold must be almost entirely
827 * concentrated at the low end. If we do see a spike as big as
828 * the worker count, we take it as a signal that the distribution
829 * is surely too wide.
830 *
831 * On its own, this is an extremely crude signal. When combined
832 * with the wakeup propagation test that precedes it (but on its
833 * own tends to overshoot) and io_worker_launch_delay, the result
834 * is that we gradually test each pool size until we find one that
835 * doesn't trigger further expansion, and then hold it for at
836 * least io_worker_idle_timeout.
837 *
838 * XXX Perhaps ideas from queueing theory or control theory could
839 * do a better job of this.
840 */
841
842 /* Read nworkers without lock for this heuristic purpose. */
845 }
846
847 if (io_index != -1)
848 {
850
851 /* Cancel timeout and update wakeup:work ratio. */
854 {
855 hist_wakeups /= 2;
856 hist_ios /= 2;
857 }
858
860 error_ioh = ioh;
861 errcallback.arg = ioh;
862
864 "worker %d processing IO",
866
867 /*
868 * Prevent interrupts between pgaio_io_reopen() and
869 * pgaio_io_perform_synchronously() that otherwise could lead to
870 * the FD getting closed in that window.
871 */
873
874 /*
875 * It's very unlikely, but possible, that reopen fails. E.g. due
876 * to memory allocations failing or file permissions changing or
877 * such. In that case we need to fail the IO.
878 *
879 * There's not really a good errno we can report here.
880 */
883
884 /*
885 * To be able to exercise the reopen-fails path, allow injection
886 * points to trigger a failure at this point.
887 */
888 INJECTION_POINT("aio-worker-after-reopen", ioh);
889
890 error_errno = 0;
891 error_ioh = NULL;
892
893 /*
894 * As part of IO completion the buffer will be marked as NOACCESS,
895 * until the buffer is pinned again - which never happens in io
896 * workers. Therefore the next time there is IO for the same
897 * buffer, the memory will be considered inaccessible. To avoid
898 * that, explicitly allow access to the memory before reading data
899 * into it.
900 */
901#ifdef USE_VALGRIND
902 {
903 struct iovec *iov;
904 uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov);
905
906 for (int i = 0; i < iov_length; i++)
908 }
909#endif
910
911#ifdef PGAIO_WORKER_SHOW_PS_INFO
912 {
914
915 sprintf(cmd, "%d: [%s] %s",
920 set_ps_display(cmd);
921 }
922#endif
923
924 /*
925 * We don't expect this to ever fail with ERROR or FATAL, no need
926 * to keep error_ioh set to the IO.
927 * pgaio_io_perform_synchronously() contains a critical section to
928 * ensure we don't accidentally fail.
929 */
931
933 errcallback.arg = NULL;
934 }
935 else
936 {
937 int timeout_ms;
938
939 /* Cancel new worker request if pending. */
941
942 /* Compute the remaining allowed idle time. */
943 if (io_worker_idle_timeout == -1)
944 {
945 /* Never time out. */
946 timeout_ms = -1;
947 }
948 else
949 {
951
952 /* If the GUC changes, reset timer. */
953 if (idle_timeout_abs != 0 &&
956
957 /* Only the highest-numbered worker can time out. */
959 {
960 if (idle_timeout_abs == 0)
961 {
962 /*
963 * I have just been promoted to the timeout worker, or
964 * the GUC changed. Compute new absolute time from
965 * now.
966 */
971 }
972 timeout_ms =
974 }
975 else
976 {
977 /* No timeout for me. */
979 timeout_ms = -1;
980 }
981 }
982
983#ifdef PGAIO_WORKER_SHOW_PS_INFO
984 sprintf(cmd, "%d: idle, wakeups:ios = %d:%d",
986 set_ps_display(cmd);
987#endif
988
992 {
993 /* WL_TIMEOUT */
996 break;
997 }
998 else
999 {
1000 /* WL_LATCH_SET */
1002 {
1003 hist_wakeups /= 2;
1004 hist_ios /= 2;
1005 }
1006 }
1008 }
1009
1011
1013 {
1014 ConfigReloadPending = false;
1016
1017 /* If io_max_workers has been decreased, exit highest first. */
1019 break;
1020 }
1021 }
1022
1023 error_context_stack = errcallback.previous;
1024 proc_exit(0);
1025}
void pgaio_io_process_completion(PgAioHandle *ioh, int result)
Definition aio.c:528
PgAioCtl * pgaio_ctl
Definition aio.c:78
#define pgaio_debug_io(elevel, ioh, msg,...)
void pgaio_io_perform_synchronously(PgAioHandle *ioh)
Definition aio_io.c:116
const char * pgaio_io_get_op_name(PgAioHandle *ioh)
Definition aio_io.c:175
int pgaio_io_get_iovec_length(PgAioHandle *ioh, struct iovec **iov)
Definition aio_io.c:219
void pgaio_io_reopen(PgAioHandle *ioh)
Definition aio_target.c:116
char * pgaio_io_get_target_description(PgAioHandle *ioh)
Definition aio_target.c:84
void AuxiliaryProcessMainCommon(void)
Definition auxprocess.c:41
sigset_t UnBlockSig
Definition pqsignal.c:22
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1751
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1639
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1603
#define Assert(condition)
Definition c.h:943
uint16_t uint16
Definition c.h:623
uint32_t uint32
Definition c.h:624
int64 TimestampTz
Definition timestamp.h:39
void EmitErrorReport(void)
Definition elog.c:1882
ErrorContextCallback * error_context_stack
Definition elog.c:99
sigjmp_buf * PG_exception_stack
Definition elog.c:101
#define DEBUG4
Definition elog.h:28
struct Latch * MyLatch
Definition globals.c:65
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
#define INJECTION_POINT(name, arg)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
Definition interrupt.c:104
volatile sig_atomic_t ShutdownRequestPending
Definition interrupt.c:28
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
void proc_exit(int code)
Definition ipc.c:105
int i
Definition isn.c:77
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
void LWLockReleaseAll(void)
Definition lwlock.c:1866
@ LW_EXCLUSIVE
Definition lwlock.h:104
void pfree(void *pointer)
Definition mcxt.c:1616
#define VALGRIND_MAKE_MEM_UNDEFINED(addr, size)
Definition memdebug.h:28
#define PGAIO_WORKER_WAKEUP_RATIO_SATURATE
static uint32 pgaio_worker_submission_queue_depth(void)
static void pgaio_worker_error_callback(void *arg)
static void pgaio_worker_request_grow(void)
static bool pgaio_worker_can_timeout(void)
static void pgaio_worker_cancel_grow(void)
static void pgaio_workerset_remove(PgAioWorkerSet *set, int worker)
static int pgaio_worker_choose_idle(int only_workers_above)
static void pgaio_worker_wake(int worker)
int io_max_workers
static void pgaio_worker_register(void)
static PgAioWorkerControl * io_worker_control
static int MyIoWorkerId
int io_worker_idle_timeout
static int pgaio_worker_submission_queue_consume(void)
static void pgaio_workerset_insert(PgAioWorkerSet *set, int worker)
#define RESUME_INTERRUPTS()
Definition miscadmin.h:138
#define START_CRIT_SECTION()
Definition miscadmin.h:152
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:125
#define HOLD_INTERRUPTS()
Definition miscadmin.h:136
#define END_CRIT_SECTION()
Definition miscadmin.h:154
#define die(msg)
#define pqsignal
Definition port.h:547
#define sprintf
Definition port.h:262
static int fb(int x)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:688
static void set_ps_display(const char *activity)
Definition ps_status.h:40
struct ErrorContextCallback * previous
Definition elog.h:299
void(* callback)(void *arg)
Definition elog.h:300
PgAioHandle * io_handles
PgAioWorkerSet idle_workerset
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
const char * description
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGALRM
Definition win32_port.h:164
#define SIGUSR2
Definition win32_port.h:171

References ErrorContextCallback::arg, Assert, AuxiliaryProcessMainCommon(), ErrorContextCallback::callback, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG4, description, die, EmitErrorReport(), END_CRIT_SECTION, error_context_stack, fb(), GetCurrentTimestamp(), HOLD_INTERRUPTS, i, PgAioWorkerControl::idle_workerset, INJECTION_POINT, PgAioCtl::io_handles, io_max_workers, io_worker_control, io_worker_idle_timeout, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), LWLockReleaseAll(), MyIoWorkerId, MyLatch, now(), PgAioWorkerControl::nworkers, pfree(), PG_exception_stack, pgaio_ctl, pgaio_debug_io, pgaio_io_get_iovec_length(), pgaio_io_get_op_name(), pgaio_io_get_target_description(), pgaio_io_perform_synchronously(), pgaio_io_process_completion(), pgaio_io_reopen(), pgaio_worker_can_timeout(), pgaio_worker_cancel_grow(), pgaio_worker_choose_idle(), pgaio_worker_error_callback(), pgaio_worker_register(), pgaio_worker_request_grow(), pgaio_worker_submission_queue_consume(), pgaio_worker_submission_queue_depth(), pgaio_worker_wake(), PGAIO_WORKER_WAKEUP_RATIO_SATURATE, pgaio_workerset_insert(), pgaio_workerset_remove(), PGC_SIGHUP, pqsignal, ErrorContextCallback::previous, proc_exit(), ProcessConfigFile(), procsignal_sigusr1_handler(), ResetLatch(), RESUME_INTERRUPTS, set_ps_display(), ShutdownRequestPending, SIGALRM, SIGHUP, SignalHandlerForConfigReload(), SignalHandlerForShutdownRequest(), SIGPIPE, SIGUSR1, SIGUSR2, sprintf, START_CRIT_SECTION, TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, UnBlockSig, VALGRIND_MAKE_MEM_UNDEFINED, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

◆ pgaio_worker_can_timeout()

static bool pgaio_worker_can_timeout ( void  )
static

Definition at line 647 of file method_worker.c.

648{
649 PgAioWorkerSet workerset;
650
652 return false;
653
654 /* Serialize against pool size changes. */
656 workerset = io_worker_control->workerset;
658
659 if (MyIoWorkerId != pgaio_workerset_get_highest(&workerset))
660 return false;
661
662 return true;
663}
@ LW_SHARED
Definition lwlock.h:105
int io_min_workers
static int pgaio_workerset_get_highest(PgAioWorkerSet *set)
uint64 PgAioWorkerSet
PgAioWorkerSet workerset

References fb(), io_min_workers, io_worker_control, LW_SHARED, LWLockAcquire(), LWLockRelease(), MyIoWorkerId, pgaio_workerset_get_highest(), and PgAioWorkerControl::workerset.

Referenced by IoWorkerMain().

◆ pgaio_worker_cancel_grow()

static void pgaio_worker_cancel_grow ( void  )
static

Definition at line 314 of file method_worker.c.

315{
317 return;
318
319 io_worker_control->grow = false;
321}
#define pg_memory_barrier()
Definition atomics.h:141

References PgAioWorkerControl::grow, io_worker_control, and pg_memory_barrier.

Referenced by IoWorkerMain().

◆ pgaio_worker_choose_idle()

static int pgaio_worker_choose_idle ( int  only_workers_above)
static

Definition at line 360 of file method_worker.c.

361{
362 PgAioWorkerSet workerset;
363 int worker;
364
366
368 if (only_workers_above >= 0)
370 if (pgaio_workerset_is_empty(&workerset))
371 return -1;
372
373 /* Find the lowest numbered idle worker and mark it not idle. */
374 worker = pgaio_workerset_get_lowest(&workerset);
376
377 return worker;
378}
bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1929
static int pgaio_workerset_get_lowest(PgAioWorkerSet *set)
static void pgaio_workerset_remove_lte(PgAioWorkerSet *set, int worker)
static bool pgaio_workerset_is_empty(PgAioWorkerSet *set)

References Assert, fb(), PgAioWorkerControl::idle_workerset, io_worker_control, LW_EXCLUSIVE, LWLockHeldByMeInMode(), pgaio_workerset_get_lowest(), pgaio_workerset_is_empty(), pgaio_workerset_remove(), and pgaio_workerset_remove_lte().

Referenced by IoWorkerMain(), and pgaio_worker_submit().

◆ pgaio_worker_die()

static void pgaio_worker_die ( int  code,
Datum  arg 
)
static

Definition at line 543 of file method_worker.c.

544{
546
550
562
563 /*
564 * Notify other workers on pool change. This allows the new highest
565 * worker to know that it is now the one that can time out, and closes a
566 * wakeup-loss race described in pgaio_worker_wake().
567 */
569}
ProcNumber MyProcNumber
Definition globals.c:92
static void pgaio_workerset_wake(PgAioWorkerSet workerset)
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]
ProcNumber proc_number

References Assert, fb(), PgAioWorkerControl::idle_workerset, INVALID_PROC_NUMBER, io_worker_control, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MyIoWorkerId, MyProcNumber, PgAioWorkerControl::nworkers, pgaio_workerset_remove(), pgaio_workerset_wake(), PgAioWorkerSlot::proc_number, PgAioWorkerControl::workers, and PgAioWorkerControl::workerset.

Referenced by pgaio_worker_register().

◆ pgaio_worker_error_callback()

static void pgaio_worker_error_callback ( void arg)
static

Definition at line 615 of file method_worker.c.

616{
617 ProcNumber owner;
619 int32 owner_pid;
621
622 if (!ioh)
623 return;
624
625 Assert(ioh->owner_procno != MyProcNumber);
627
628 owner = ioh->owner_procno;
630 owner_pid = owner_proc->pid;
631
632 errcontext("I/O worker executing I/O on behalf of process %d", owner_pid);
633}
int32_t int32
Definition c.h:620
Datum arg
Definition elog.c:1322
#define errcontext
Definition elog.h:200
@ B_IO_WORKER
Definition miscadmin.h:376
BackendType MyBackendType
Definition miscinit.c:65
#define GetPGProcByNumber(n)
Definition proc.h:504
int ProcNumber
Definition procnumber.h:24
Definition proc.h:179

References arg, Assert, B_IO_WORKER, errcontext, fb(), GetPGProcByNumber, MyBackendType, and MyProcNumber.

Referenced by IoWorkerMain().

◆ pgaio_worker_needs_synchronous_execution()

static bool pgaio_worker_needs_synchronous_execution ( PgAioHandle ioh)
static

Definition at line 473 of file method_worker.c.

474{
475 return
479}
@ PGAIO_HF_REFERENCES_LOCAL
Definition aio.h:60
bool pgaio_io_can_reopen(PgAioHandle *ioh)
Definition aio_target.c:103
bool IsUnderPostmaster
Definition globals.c:122

References fb(), IsUnderPostmaster, PGAIO_HF_REFERENCES_LOCAL, and pgaio_io_can_reopen().

Referenced by pgaio_worker_submit().

◆ pgaio_worker_pm_clear_grow_signal_sent()

void pgaio_worker_pm_clear_grow_signal_sent ( void  )

◆ pgaio_worker_pm_test_grow()

bool pgaio_worker_pm_test_grow ( void  )

Definition at line 339 of file method_worker.c.

340{
343}

References PgAioWorkerControl::grow, io_worker_control, and pg_memory_barrier.

Referenced by maybe_start_io_workers().

◆ pgaio_worker_pm_test_grow_signal_sent()

bool pgaio_worker_pm_test_grow_signal_sent ( void  )

◆ pgaio_worker_register()

static void pgaio_worker_register ( void  )
static

Definition at line 576 of file method_worker.c.

577{
580
581 MyIoWorkerId = -1;
582
584 /* Find lowest unused worker ID. */
589 if (MyIoWorkerId == -1)
590 elog(ERROR, "couldn't find a free worker ID");
591
595
604
605 /*
606 * Notify other workers on pool change. If we were the highest worker,
607 * this allows the new highest worker to know that it can time out.
608 */
610
612}
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
static void pgaio_workerset_all(PgAioWorkerSet *set)
static void pgaio_worker_die(int code, Datum arg)
static void pgaio_workerset_subtract(PgAioWorkerSet *set1, const PgAioWorkerSet *set2)
#define MAX_IO_WORKERS
Definition proc.h:526

References Assert, elog, ERROR, fb(), INVALID_PROC_NUMBER, io_worker_control, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MAX_IO_WORKERS, MyIoWorkerId, MyProcNumber, PgAioWorkerControl::nworkers, on_shmem_exit(), pgaio_worker_die(), pgaio_workerset_all(), pgaio_workerset_get_lowest(), pgaio_workerset_insert(), pgaio_workerset_is_empty(), pgaio_workerset_subtract(), pgaio_workerset_wake(), PgAioWorkerSlot::proc_number, PgAioWorkerControl::workers, and PgAioWorkerControl::workerset.

Referenced by IoWorkerMain().

◆ pgaio_worker_request_grow()

static void pgaio_worker_request_grow ( void  )
static

Definition at line 278 of file method_worker.c.

279{
280 /*
281 * Suppress useless signaling if we already know that we're at the
282 * maximum. This uses an unlocked read of nworkers, but that's OK for
283 * this heuristic purpose.
284 */
286 return;
287
288 /* Already requested? */
290 return;
291
292 io_worker_control->grow = true;
294
295 /*
296 * If the postmaster has already been signaled, don't do it again until
297 * the postmaster clears this flag. There is no point in repeated signals
298 * if grow is being set and cleared repeatedly while the postmaster is
299 * waiting for io_worker_launch_interval, which it applies even to
300 * canceled requests.
301 */
303 return;
304
308}
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:164
@ PMSIGNAL_IO_WORKER_GROW
Definition pmsignal.h:41

References PgAioWorkerControl::grow, PgAioWorkerControl::grow_signal_sent, io_max_workers, io_worker_control, PgAioWorkerControl::nworkers, pg_memory_barrier, PMSIGNAL_IO_WORKER_GROW, and SendPostmasterSignal().

Referenced by IoWorkerMain().

◆ pgaio_worker_shmem_init()

static void pgaio_worker_shmem_init ( void arg)
static

Definition at line 256 of file method_worker.c.

257{
258 int queue_size;
259
260 /* Round size up like in pgaio_worker_shmem_request() */
262
263 io_worker_submission_queue->size = queue_size;
266 io_worker_control->grow = false;
269
270 for (int i = 0; i < MAX_IO_WORKERS; ++i)
272}
static int io_worker_queue_size
static PgAioWorkerSubmissionQueue * io_worker_submission_queue
static void pgaio_workerset_initialize(PgAioWorkerSet *set)
static uint32 pg_nextpower2_32(uint32 num)

References PgAioWorkerControl::grow, PgAioWorkerSubmissionQueue::head, i, PgAioWorkerControl::idle_workerset, INVALID_PROC_NUMBER, io_worker_control, io_worker_queue_size, io_worker_submission_queue, MAX_IO_WORKERS, pg_nextpower2_32(), pgaio_workerset_initialize(), PgAioWorkerSlot::proc_number, PgAioWorkerSubmissionQueue::size, PgAioWorkerSubmissionQueue::tail, PgAioWorkerControl::workers, and PgAioWorkerControl::workerset.

◆ pgaio_worker_shmem_request()

static void pgaio_worker_shmem_request ( void arg)
static

Definition at line 234 of file method_worker.c.

235{
236 size_t size;
237 int queue_size;
238
239 /* Round size up to next power of two so we can make a mask. */
241
242 size = offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(int) * queue_size;
243 ShmemRequestStruct(.name = "AioWorkerSubmissionQueue",
244 .size = size,
245 .ptr = (void **) &io_worker_submission_queue,
246 );
247
248 size = offsetof(PgAioWorkerControl, workers) + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
249 ShmemRequestStruct(.name = "AioWorkerControl",
250 .size = size,
251 .ptr = (void **) &io_worker_control,
252 );
253}
#define ShmemRequestStruct(...)
Definition shmem.h:176
const char * name

References fb(), io_worker_control, io_worker_queue_size, io_worker_submission_queue, MAX_IO_WORKERS, name, pg_nextpower2_32(), and ShmemRequestStruct.

◆ pgaio_worker_submission_queue_consume()

static int pgaio_worker_submission_queue_consume ( void  )
static

Definition at line 436 of file method_worker.c.

437{
439 int result;
440
442
444 if (queue->tail == queue->head)
445 return -1; /* empty */
446
447 result = queue->sqes[queue->tail];
448 queue->tail = (queue->tail + 1) & (queue->size - 1);
449
450 return result;
451}
uint32 result
int sqes[FLEXIBLE_ARRAY_MEMBER]

References Assert, fb(), PgAioWorkerSubmissionQueue::head, io_worker_submission_queue, LW_EXCLUSIVE, LWLockHeldByMeInMode(), result, PgAioWorkerSubmissionQueue::size, PgAioWorkerSubmissionQueue::sqes, and PgAioWorkerSubmissionQueue::tail.

Referenced by IoWorkerMain().

◆ pgaio_worker_submission_queue_depth()

static uint32 pgaio_worker_submission_queue_depth ( void  )
static

Definition at line 454 of file method_worker.c.

455{
456 uint32 head;
457 uint32 tail;
458
460
463
464 if (tail > head)
466
467 Assert(head >= tail);
468
469 return head - tail;
470}

References Assert, fb(), PgAioWorkerSubmissionQueue::head, io_worker_submission_queue, LW_EXCLUSIVE, LWLockHeldByMeInMode(), PgAioWorkerSubmissionQueue::size, and PgAioWorkerSubmissionQueue::tail.

Referenced by IoWorkerMain().

◆ pgaio_worker_submission_queue_insert()

static bool pgaio_worker_submission_queue_insert ( PgAioHandle ioh)
static

Definition at line 413 of file method_worker.c.

414{
417
419
421 new_head = (queue->head + 1) & (queue->size - 1);
422 if (new_head == queue->tail)
423 {
424 pgaio_debug(DEBUG3, "io queue is full, at %u elements",
426 return false; /* full */
427 }
428
429 queue->sqes[queue->head] = pgaio_io_get_id(ioh);
430 queue->head = new_head;
431
432 return true;
433}
int pgaio_io_get_id(PgAioHandle *ioh)
Definition aio.c:342
#define pgaio_debug(elevel, msg,...)
#define DEBUG3
Definition elog.h:29

References Assert, DEBUG3, fb(), PgAioWorkerSubmissionQueue::head, io_worker_submission_queue, LW_EXCLUSIVE, LWLockHeldByMeInMode(), pgaio_debug, pgaio_io_get_id(), PgAioWorkerSubmissionQueue::size, PgAioWorkerSubmissionQueue::sqes, and PgAioWorkerSubmissionQueue::tail.

Referenced by pgaio_worker_submit().

◆ pgaio_worker_submit()

static int pgaio_worker_submit ( uint16  num_staged_ios,
PgAioHandle **  staged_ios 
)
static

Definition at line 482 of file method_worker.c.

483{
485 int nsync = 0;
486 int worker = -1;
487
488 Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
489
490 for (int i = 0; i < num_staged_ios; i++)
491 pgaio_io_prepare_submit(staged_ios[i]);
492
494 {
495 for (int i = 0; i < num_staged_ios; ++i)
496 {
498 if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
499 {
500 /*
501 * Do the rest synchronously. If the queue is full, give up
502 * and do the rest synchronously. We're holding an exclusive
503 * lock on the queue so nothing can consume entries.
504 */
505 synchronous_ios = &staged_ios[i];
506 nsync = (num_staged_ios - i);
507
508 break;
509 }
510 }
511 /* Choose one worker to wake for this batch. */
512 worker = pgaio_worker_choose_idle(-1);
514
515 /* Wake up chosen worker. It will wake peers if necessary. */
516 if (worker != -1)
517 pgaio_worker_wake(worker);
518 }
519 else
520 {
521 /* do everything synchronously, no wakeup needed */
522 synchronous_ios = staged_ios;
523 nsync = num_staged_ios;
524 }
525
526 /* Run whatever is left synchronously. */
527 if (nsync > 0)
528 {
529 for (int i = 0; i < nsync; ++i)
530 {
532 }
533 }
534
535 return num_staged_ios;
536}
void pgaio_io_prepare_submit(PgAioHandle *ioh)
Definition aio.c:510
#define PGAIO_SUBMIT_BATCH_SIZE
bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1321
static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh)

References Assert, fb(), i, LW_EXCLUSIVE, LWLockConditionalAcquire(), LWLockRelease(), pgaio_io_perform_synchronously(), pgaio_io_prepare_submit(), PGAIO_SUBMIT_BATCH_SIZE, pgaio_worker_choose_idle(), pgaio_worker_needs_synchronous_execution(), pgaio_worker_submission_queue_insert(), and pgaio_worker_wake().

◆ pgaio_worker_wake()

static void pgaio_worker_wake ( int  worker)
static

Definition at line 385 of file method_worker.c.

386{
387 ProcNumber proc_number;
388
389 /*
390 * If the selected worker is concurrently exiting, then pgaio_worker_die()
391 * had not yet removed it as of when we saw it in idle_workerset. That's
392 * OK, because it will wake all remaining workers to close wakeup-vs-exit
393 * races: *someone* will see the queued IO. If there are no workers
394 * running, the postmaster will start a new one.
395 */
396 proc_number = io_worker_control->workers[worker].proc_number;
397 if (proc_number != INVALID_PROC_NUMBER)
398 SetLatch(&GetPGProcByNumber(proc_number)->procLatch);
399}
void SetLatch(Latch *latch)
Definition latch.c:290

References GetPGProcByNumber, INVALID_PROC_NUMBER, io_worker_control, PgAioWorkerSlot::proc_number, SetLatch(), and PgAioWorkerControl::workers.

Referenced by IoWorkerMain(), pgaio_worker_submit(), and pgaio_workerset_wake().

◆ pgaio_workers_enabled()

bool pgaio_workers_enabled ( void  )

Definition at line 1028 of file method_worker.c.

1029{
1030 return io_method == IOMETHOD_WORKER;
1031}
int io_method
Definition aio.c:74
@ IOMETHOD_WORKER
Definition aio.h:35

References io_method, and IOMETHOD_WORKER.

Referenced by maybe_start_io_workers_scheduled_at().

◆ pgaio_workerset_all()

static void pgaio_workerset_all ( PgAioWorkerSet set)
static

Definition at line 163 of file method_worker.c.

164{
166}
#define PGAIO_WORKERSET_BITS

References fb(), MAX_IO_WORKERS, and PGAIO_WORKERSET_BITS.

Referenced by pgaio_worker_register().

◆ pgaio_workerset_get_highest()

static int pgaio_workerset_get_highest ( PgAioWorkerSet set)
static

Definition at line 196 of file method_worker.c.

197{
199 return pg_leftmost_one_pos64(*set);
200}
static int pg_leftmost_one_pos64(uint64 word)
Definition pg_bitutils.h:72

References Assert, pg_leftmost_one_pos64(), and pgaio_workerset_is_empty().

Referenced by pgaio_worker_can_timeout().

◆ pgaio_workerset_get_lowest()

static int pgaio_workerset_get_lowest ( PgAioWorkerSet set)
static

Definition at line 203 of file method_worker.c.

204{
206 return pg_rightmost_one_pos64(*set);
207}
static int pg_rightmost_one_pos64(uint64 word)

References Assert, pg_rightmost_one_pos64(), and pgaio_workerset_is_empty().

Referenced by pgaio_worker_choose_idle(), pgaio_worker_register(), and pgaio_workerset_pop_lowest().

◆ pgaio_workerset_initialize()

static void pgaio_workerset_initialize ( PgAioWorkerSet set)
static

Definition at line 144 of file method_worker.c.

145{
146 *set = 0;
147}

Referenced by pgaio_worker_shmem_init().

◆ pgaio_workerset_insert()

static void pgaio_workerset_insert ( PgAioWorkerSet set,
int  worker 
)
static

Definition at line 175 of file method_worker.c.

176{
177 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
178 *set |= pgaio_workerset_singleton(worker);
179}
static PgAioWorkerSet pgaio_workerset_singleton(int worker)

References Assert, MAX_IO_WORKERS, and pgaio_workerset_singleton().

Referenced by IoWorkerMain(), and pgaio_worker_register().

◆ pgaio_workerset_is_empty()

static bool pgaio_workerset_is_empty ( PgAioWorkerSet set)
static

◆ pgaio_workerset_pop_lowest()

static int pgaio_workerset_pop_lowest ( PgAioWorkerSet set)
static

Definition at line 210 of file method_worker.c.

211{
212 int worker = pgaio_workerset_get_lowest(set);
213
214 pgaio_workerset_remove(set, worker);
215 return worker;
216}

References pgaio_workerset_get_lowest(), and pgaio_workerset_remove().

Referenced by pgaio_workerset_wake().

◆ pgaio_workerset_remove()

static void pgaio_workerset_remove ( PgAioWorkerSet set,
int  worker 
)
static

Definition at line 182 of file method_worker.c.

183{
184 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
185 *set &= ~pgaio_workerset_singleton(worker);
186}

References Assert, fb(), and MAX_IO_WORKERS.

Referenced by IoWorkerMain(), pgaio_worker_choose_idle(), pgaio_worker_die(), and pgaio_workerset_pop_lowest().

◆ pgaio_workerset_remove_lte()

static void pgaio_workerset_remove_lte ( PgAioWorkerSet set,
int  worker 
)
static

Definition at line 189 of file method_worker.c.

190{
191 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
192 *set &= (~(PgAioWorkerSet) 0) << (worker + 1);
193}

References Assert, and MAX_IO_WORKERS.

Referenced by pgaio_worker_choose_idle().

◆ pgaio_workerset_singleton()

static PgAioWorkerSet pgaio_workerset_singleton ( int  worker)
static

Definition at line 156 of file method_worker.c.

157{
158 Assert(worker >= 0 && worker < MAX_IO_WORKERS);
159 return UINT64_C(1) << worker;
160}

References Assert, fb(), and MAX_IO_WORKERS.

Referenced by pgaio_workerset_insert().

◆ pgaio_workerset_subtract()

static void pgaio_workerset_subtract ( PgAioWorkerSet set1,
const PgAioWorkerSet set2 
)
static

Definition at line 169 of file method_worker.c.

170{
171 *set1 &= ~*set2;
172}

References fb().

Referenced by pgaio_worker_register().

◆ pgaio_workerset_wake()

static void pgaio_workerset_wake ( PgAioWorkerSet  workerset)
static

Definition at line 406 of file method_worker.c.

407{
408 while (!pgaio_workerset_is_empty(&workerset))
410}
static int pgaio_workerset_pop_lowest(PgAioWorkerSet *set)

References pgaio_worker_wake(), pgaio_workerset_is_empty(), and pgaio_workerset_pop_lowest().

Referenced by pgaio_worker_die(), and pgaio_worker_register().

Variable Documentation

◆ io_max_workers

int io_max_workers = 8

◆ io_min_workers

int io_min_workers = 2

◆ io_worker_control

◆ io_worker_idle_timeout

int io_worker_idle_timeout = 60000

Definition at line 133 of file method_worker.c.

Referenced by IoWorkerMain().

◆ io_worker_launch_interval

int io_worker_launch_interval = 100

Definition at line 134 of file method_worker.c.

Referenced by maybe_start_io_workers().

◆ io_worker_queue_size

int io_worker_queue_size = 64
static

Definition at line 137 of file method_worker.c.

Referenced by pgaio_worker_shmem_init(), and pgaio_worker_shmem_request().

◆ io_worker_submission_queue

◆ MyIoWorkerId

int MyIoWorkerId = -1
static

◆ pgaio_worker_ops

const IoMethodOps pgaio_worker_ops
Initial value:
= {
.shmem_callbacks.request_fn = pgaio_worker_shmem_request,
.shmem_callbacks.init_fn = pgaio_worker_shmem_init,
.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
}
static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
static void pgaio_worker_shmem_request(void *arg)
static void pgaio_worker_shmem_init(void *arg)

Definition at line 121 of file method_worker.c.

121 {
122 .shmem_callbacks.request_fn = pgaio_worker_shmem_request,
123 .shmem_callbacks.init_fn = pgaio_worker_shmem_init,
124
125 .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
126 .submit = pgaio_worker_submit,
127};