17from collections
import defaultdict
18from typing
import Dict
23 Core implementation of the authorization server. The API is
24 inheritance-based, with entry points at do_GET() and do_POST(). See the
25 documentation for BaseHTTPRequestHandler.
28 JsonObject = Dict[str, object]
32 Switches the behavior of the provider depending on the issuer URI.
36 or self.
path ==
"/.well-known/oauth-authorization-server/alternate"
53 Checks the expected value of the Authorization header, if any.
55 secret = self.
_get_param(
"expected_secret",
None)
59 assert "Authorization" in self.headers
60 method, creds = self.headers[
"Authorization"].
split()
63 raise RuntimeError(f
"client used {method} auth; expected Basic")
69 expected_creds = f
"{username}:{password}"
73 f
"client sent '{creds}'; expected b64encode('{expected_creds}')"
80 config_path =
"/.well-known/openid-configuration"
82 config_path =
"/.well-known/oauth-authorization-server"
84 if self.
path == config_path:
87 self.send_error(404,
"Not Found")
94 Parses apart the form-urlencoded request body and returns the resulting
95 dict. For use by do_POST().
97 size =
int(self.headers[
"Content-Length"])
98 form = self.rfile.
read(size)
100 assert self.headers[
"Content-Type"] ==
"application/x-www-form-urlencoded"
104 keep_blank_values=
True,
112 Returns the client_id sent in the POST body or the Authorization header.
113 self._parse_params() must have been called first.
115 if "client_id" in self.
_params:
116 return self.
_params[
"client_id"][0]
118 if "Authorization" not in self.headers:
121 _, creds = self.headers[
"Authorization"].
split()
141 if self.
path ==
"/authorize":
143 elif self.
path ==
"/token":
153 Returns True if the client has requested a modification to this stage of
156 if not hasattr(self,
"_test_params"):
165 and self.
path ==
"/.well-known/openid-configuration"
167 or (stage ==
"device" and self.
path ==
"/authorize")
168 or (stage ==
"token" and self.
path ==
"/token")
173 If the client has requested a modification to this stage (see
174 _should_modify()), this method searches the provided test parameters for
175 a key of the given name, and returns it if found. Otherwise the provided
186 Returns "application/json" unless the test has requested something
189 return self.
_get_param(
"content_type",
"application/json")
194 Returns 0 unless the test has requested something different.
201 Returns "authorization_pending" unless the test has requested something
204 return self.
_get_param(
"retry_code",
"authorization_pending")
209 Returns "verification_uri" unless the test has requested something
212 return self.
_get_param(
"uri_spelling",
"verification_uri")
217 Returns a dict with any additional entries that should be folded into a
218 JSON response, as determined by test parameters provided by the client:
220 - huge_response: if set to True, the dict will contain a gigantic string
223 - nested_array: if set to nonzero, the dict will contain a deeply nested
224 array so that the top-level object has the given depth
226 - nested_object: if set to nonzero, the dict will contain a deeply
227 nested JSON object so that the top-level object has the given depth
232 ret[
"_pad_"] =
"x" * 1024 * 1024
247 The actual Bearer token sent back to the client on success. Tests may
248 override this with the "token" test parameter.
251 if token
is not None:
262 Trims the response JSON, if necessary, and logs it for later debugging.
271 js[
"_pad_"] = pad[:64] + f
"[...truncated from {len(pad)} bytes]"
274 self.log_message(
"sending JSON response: %s", resp)
278 assert len(resp) < 1024,
"_log_response must be adjusted for new JSON"
282 Sends the provided JSON dict as an application/json response.
283 self._response_code can be modified to send JSON error responses.
290 self.send_header(
"Content-Length",
str(
len(resp)))
293 self.wfile.
write(resp)
298 issuer = f
"http://127.0.0.1:{port}"
300 issuer +=
"/alternate"
306 "token_endpoint": issuer +
"/token",
307 "device_authorization_endpoint": issuer +
"/authorize",
308 "response_types_supported": [
"token"],
309 "subject_types_supported": [
"public"],
310 "id_token_signing_alg_values_supported": [
"RS256"],
311 "grant_types_supported": [
312 "authorization_code",
313 "urn:ietf:params:oauth:grant-type:device_code",
320 A cached _TokenState object for the connected client (as determined by
321 the request's client_id), or a new one if it doesn't already exist.
323 This relies on the existence of a defaultdict attached to the server;
326 return self.server.token_state[self.
client_id]
330 Removes any cached _TokenState for the current client_id. Call this
331 after the token exchange ends to get rid of unnecessary state.
333 if self.
client_id in self.server.token_state:
334 del self.server.token_state[self.
client_id]
337 uri =
"https://example.com/"
339 uri =
"https://example.org/"
342 "device_code":
"postgres",
343 "user_code":
"postgresuser",
350 if interval
is not None:
351 resp[
"interval"] = interval
358 assert self.
_params[
"scope"][0],
"empty scopes should be omitted"
367 resp = {
"error": err}
371 resp[
"error_description"] = desc
384 ), f
"client waited only {delay} seconds between token requests (expected {self._token_state.min_delay})"
401 "token_type":
"bearer",
408 Starts the authorization server on localhost. The ephemeral port in use will
409 be printed to stdout.
437if __name__ ==
"__main__":
void print(const void *obj)
Dict[str, str] _parse_params(self)
JsonObject authorization(self)
None _send_json(self, JsonObject js)
None _log_response(self, JsonObject js)
_remove_token_state(self)
_get_param(self, name, default)
bool _should_modify(self)
static struct cvec * range(struct vars *v, chr a, chr b, int cases)