PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
aio_callback.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * aio_callback.c
4 * AIO - Functionality related to callbacks that can be registered on IO
5 * Handles
6 *
7 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/backend/storage/aio/aio_callback.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16#include "postgres.h"
17
18#include "miscadmin.h"
19#include "storage/aio.h"
21
22
23/* just to have something to put into aio_handle_cbs */
25
27{
29 const char *const name;
31
32/*
33 * Callback definition for the callbacks that can be registered on an IO
34 * handle. See PgAioHandleCallbackID's definition for an explanation for why
35 * callbacks are not identified by a pointer.
36 */
38#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
40#undef CALLBACK_ENTRY
41};
42
43
44
45/* --------------------------------------------------------------------------------
46 * Public callback related functions operating on IO Handles
47 * --------------------------------------------------------------------------------
48 */
49
50/*
51 * Register callback for the IO handle.
52 *
53 * Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
54 * registered for each IO.
55 *
56 * Callbacks need to be registered before [indirectly] calling
57 * pgaio_io_start_*(), as the IO may be executed immediately.
58 *
59 * A callback can be passed a small bit of data, e.g. to indicate whether to
60 * zero a buffer if it is invalid.
61 *
62 *
63 * Note that callbacks are executed in critical sections. This is necessary
64 * to be able to execute IO in critical sections (consider e.g. WAL
65 * logging). To perform AIO we first need to acquire a handle, which, if there
66 * are no free handles, requires waiting for IOs to complete and to execute
67 * their completion callbacks.
68 *
69 * Callbacks may be executed in the issuing backend but also in another
70 * backend (because that backend is waiting for the IO) or in IO workers (if
71 * io_method=worker is used).
72 *
73 *
74 * See PgAioHandleCallbackID's definition for an explanation for why
75 * callbacks are not identified by a pointer.
76 */
77void
79 uint8 cb_data)
80{
81 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
82
83 if (cb_id >= lengthof(aio_handle_cbs))
84 elog(ERROR, "callback %d is out of range", cb_id);
85 if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
86 aio_handle_cbs[cb_id].cb->complete_local == NULL)
87 elog(ERROR, "callback %d does not have a completion callback", cb_id);
89 elog(PANIC, "too many callbacks, the max is %d",
91 ioh->callbacks[ioh->num_callbacks] = cb_id;
92 ioh->callbacks_data[ioh->num_callbacks] = cb_data;
93
95 "adding cb #%d, id %d/%s",
96 ioh->num_callbacks + 1,
97 cb_id, ce->name);
98
99 ioh->num_callbacks++;
100}
101
102/*
103 * Associate an array of data with the Handle. This is e.g. useful to the
104 * transport knowledge about which buffers a multi-block IO affects to
105 * completion callbacks.
106 *
107 * Right now this can be done only once for each IO, even though multiple
108 * callbacks can be registered. There aren't any known usecases requiring more
109 * and the required amount of shared memory does add up, so it doesn't seem
110 * worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
111 */
112void
114{
116 Assert(ioh->handle_data_len == 0);
118
119 for (int i = 0; i < len; i++)
121 ioh->handle_data_len = len;
122}
123
124/*
125 * Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
126 * array to a 64bit array. Without it callers would end up needing to
127 * open-code equivalent code.
128 */
129void
131{
133 Assert(ioh->handle_data_len == 0);
135
136 for (int i = 0; i < len; i++)
138 ioh->handle_data_len = len;
139}
140
141/*
142 * Return data set with pgaio_io_set_handle_data_*().
143 */
144uint64 *
146{
147 Assert(ioh->handle_data_len > 0);
148
149 *len = ioh->handle_data_len;
150
151 return &pgaio_ctl->handle_data[ioh->iovec_off];
152}
153
154
155
156/* --------------------------------------------------------------------------------
157 * Public IO Result related functions
158 * --------------------------------------------------------------------------------
159 */
160
161void
162pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
163{
164 PgAioHandleCallbackID cb_id = result.id;
165 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
166
167 Assert(result.status != PGAIO_RS_UNKNOWN);
168 Assert(result.status != PGAIO_RS_OK);
169
170 if (ce->cb->report == NULL)
171 elog(ERROR, "callback %d/%s does not have report callback",
172 result.id, ce->name);
173
174 ce->cb->report(result, target_data, elevel);
175}
176
177
178
179/* --------------------------------------------------------------------------------
180 * Internal callback related functions operating on IO Handles
181 * --------------------------------------------------------------------------------
182 */
183
184/*
185 * Internal function which invokes ->stage for all the registered callbacks.
186 */
187void
189{
191 Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
192
193 for (int i = ioh->num_callbacks; i > 0; i--)
194 {
195 PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
196 uint8 cb_data = ioh->callbacks_data[i - 1];
197 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
198
199 if (!ce->cb->stage)
200 continue;
201
203 "calling cb #%d %d/%s->stage(%u)",
204 i, cb_id, ce->name, cb_data);
205 ce->cb->stage(ioh, cb_data);
206 }
207}
208
209/*
210 * Internal function which invokes ->complete_shared for all the registered
211 * callbacks.
212 */
213void
215{
216 PgAioResult result;
217
219
221 Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
222
223 result.status = PGAIO_RS_OK; /* low level IO is always considered OK */
224 result.result = ioh->result;
225 result.id = PGAIO_HCB_INVALID;
226 result.error_data = 0;
227
228 /*
229 * Call callbacks with the last registered (innermost) callback first.
230 * Each callback can modify the result forwarded to the next callback.
231 */
232 for (int i = ioh->num_callbacks; i > 0; i--)
233 {
234 PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
235 uint8 cb_data = ioh->callbacks_data[i - 1];
236 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
237
238 if (!ce->cb->complete_shared)
239 continue;
240
242 "calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
243 i, cb_id, ce->name,
244 cb_data,
246 result.id, result.error_data, result.result);
247 result = ce->cb->complete_shared(ioh, result, cb_data);
248 }
249
250 ioh->distilled_result = result;
251
253 "after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
255 result.id, result.error_data, result.result,
256 ioh->result);
257
259}
260
261/*
262 * Internal function which invokes ->complete_local for all the registered
263 * callbacks.
264 *
265 * Returns ioh->distilled_result after, possibly, being modified by local
266 * callbacks.
267 *
268 * XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
269 */
272{
273 PgAioResult result;
274
276
278 Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
279
280 /* start with distilled result from shared callback */
281 result = ioh->distilled_result;
282
283 for (int i = ioh->num_callbacks; i > 0; i--)
284 {
285 PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
286 uint8 cb_data = ioh->callbacks_data[i - 1];
287 const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
288
289 if (!ce->cb->complete_local)
290 continue;
291
293 "calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
294 i, cb_id, ce->name, cb_data,
296 result.id, result.error_data, result.result);
297 result = ce->cb->complete_local(ioh, result, cb_data);
298 }
299
300 /*
301 * Note that we don't save the result in ioh->distilled_result, the local
302 * callback's result should not ever matter to other waiters. However, the
303 * local backend does care, so we return the result as modified by local
304 * callbacks, which then can be passed to ioh->report_return->result.
305 */
307 "after local completion: result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
309 result.id, result.error_data, result.result,
310 ioh->result);
311
313
314 return result;
315}
const char * pgaio_result_status_string(PgAioResultStatus rs)
Definition: aio.c:834
PgAioCtl * pgaio_ctl
Definition: aio.c:79
PgAioHandleCallbackID
Definition: aio.h:192
@ PGAIO_HCB_INVALID
Definition: aio.h:193
#define PGAIO_HANDLE_MAX_CALLBACKS
Definition: aio.h:256
#define PGAIO_TID_COUNT
Definition: aio.h:122
#define PGAIO_OP_COUNT
Definition: aio.h:107
@ PGAIO_TID_INVALID
Definition: aio.h:119
@ PGAIO_OP_INVALID
Definition: aio.h:90
void pgaio_io_call_stage(PgAioHandle *ioh)
Definition: aio_callback.c:188
static const PgAioHandleCallbacksEntry aio_handle_cbs[]
Definition: aio_callback.c:37
void pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
Definition: aio_callback.c:130
PgAioResult pgaio_io_call_complete_local(PgAioHandle *ioh)
Definition: aio_callback.c:271
void pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id, uint8 cb_data)
Definition: aio_callback.c:78
void pgaio_io_call_complete_shared(PgAioHandle *ioh)
Definition: aio_callback.c:214
uint64 * pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
Definition: aio_callback.c:145
void pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
Definition: aio_callback.c:113
static const PgAioHandleCallbacks aio_invalid_cb
Definition: aio_callback.c:24
struct PgAioHandleCallbacksEntry PgAioHandleCallbacksEntry
#define CALLBACK_ENTRY(id, callback)
void pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
Definition: aio_callback.c:162
@ PGAIO_HS_HANDED_OUT
Definition: aio_internal.h:48
#define pgaio_debug_io(elevel, ioh, msg,...)
Definition: aio_internal.h:368
@ PGAIO_RS_OK
Definition: aio_types.h:77
@ PGAIO_RS_UNKNOWN
Definition: aio_types.h:76
uint8_t uint8
Definition: c.h:500
uint64_t uint64
Definition: c.h:503
uint32_t uint32
Definition: c.h:502
#define lengthof(array)
Definition: c.h:759
#define DEBUG3
Definition: elog.h:28
#define PANIC
Definition: elog.h:42
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define DEBUG4
Definition: elog.h:27
Assert(PointerIsAligned(start, uint64))
int i
Definition: isn.c:77
#define START_CRIT_SECTION()
Definition: miscadmin.h:149
#define END_CRIT_SECTION()
Definition: miscadmin.h:151
const void size_t len
const void * data
#define PG_IOV_MAX
Definition: pg_iovec.h:41
uint64 * handle_data
Definition: aio_internal.h:238
const PgAioHandleCallbacks *const cb
Definition: aio_callback.c:28
const char *const name
Definition: aio_callback.c:29
PgAioHandleCallbackComplete complete_shared
Definition: aio.h:228
PgAioHandleCallbackStage stage
Definition: aio.h:208
PgAioHandleCallbackReport report
Definition: aio.h:247
PgAioHandleCallbackComplete complete_local
Definition: aio.h:240
PgAioResult distilled_result
Definition: aio_internal.h:151
uint8 callbacks[PGAIO_HANDLE_MAX_CALLBACKS]
Definition: aio_internal.h:108
uint8 handle_data_len
Definition: aio_internal.h:117
PgAioOp op
Definition: aio_internal.h:100
uint32 iovec_off
Definition: aio_internal.h:159
uint8 callbacks_data[PGAIO_HANDLE_MAX_CALLBACKS]
Definition: aio_internal.h:111
uint8 num_callbacks
Definition: aio_internal.h:105
PgAioHandleState state
Definition: aio_internal.h:94
PgAioTargetID target
Definition: aio_internal.h:97
uint32 status
Definition: aio_types.h:95
uint32 error_data
Definition: aio_types.h:98
int32 result
Definition: aio_types.h:100
uint32 id
Definition: aio_types.h:92