18from collections
import defaultdict
19from typing
import Dict
22ssl_cert = ssl_dir +
"/server-localhost-alt-names.crt"
23ssl_key = ssl_dir +
"/server-localhost-alt-names.key"
28 Core implementation of the authorization server. The API is
29 inheritance-based, with entry points at do_GET() and do_POST(). See the
30 documentation for BaseHTTPRequestHandler.
33 JsonObject = Dict[str, object]
37 Switches the behavior of the provider depending on the issuer URI.
41 or self.
path ==
"/.well-known/oauth-authorization-server/alternate"
58 Checks the expected value of the Authorization header, if any.
60 secret = self.
_get_param(
"expected_secret",
None)
64 assert "Authorization" in self.headers
65 method, creds = self.headers[
"Authorization"].
split()
68 raise RuntimeError(f
"client used {method} auth; expected Basic")
74 expected_creds = f
"{username}:{password}"
78 f
"client sent '{creds}'; expected b64encode('{expected_creds}')"
85 config_path =
"/.well-known/openid-configuration"
87 config_path =
"/.well-known/oauth-authorization-server"
89 if self.
path == config_path:
92 self.send_error(404,
"Not Found")
99 Parses apart the form-urlencoded request body and returns the resulting
100 dict. For use by do_POST().
102 size =
int(self.headers[
"Content-Length"])
103 form = self.rfile.
read(size)
105 assert self.headers[
"Content-Type"] ==
"application/x-www-form-urlencoded"
109 keep_blank_values=
True,
117 Returns the client_id sent in the POST body or the Authorization header.
118 self._parse_params() must have been called first.
120 if "client_id" in self.
_params:
121 return self.
_params[
"client_id"][0]
123 if "Authorization" not in self.headers:
126 _, creds = self.headers[
"Authorization"].
split()
146 if self.
path ==
"/authorize":
148 elif self.
path ==
"/token":
158 Returns True if the client has requested a modification to this stage of
161 if not hasattr(self,
"_test_params"):
170 and self.
path ==
"/.well-known/openid-configuration"
172 or (stage ==
"device" and self.
path ==
"/authorize")
173 or (stage ==
"token" and self.
path ==
"/token")
178 If the client has requested a modification to this stage (see
179 _should_modify()), this method searches the provided test parameters for
180 a key of the given name, and returns it if found. Otherwise the provided
191 Returns "application/json" unless the test has requested something
194 return self.
_get_param(
"content_type",
"application/json")
199 Returns 0 unless the test has requested something different.
206 Returns "authorization_pending" unless the test has requested something
209 return self.
_get_param(
"retry_code",
"authorization_pending")
214 Returns "verification_uri" unless the test has requested something
217 return self.
_get_param(
"uri_spelling",
"verification_uri")
222 Returns a dict with any additional entries that should be folded into a
223 JSON response, as determined by test parameters provided by the client:
225 - huge_response: if set to True, the dict will contain a gigantic string
228 - nested_array: if set to nonzero, the dict will contain a deeply nested
229 array so that the top-level object has the given depth
231 - nested_object: if set to nonzero, the dict will contain a deeply
232 nested JSON object so that the top-level object has the given depth
237 ret[
"_pad_"] =
"x" * 1024 * 1024
252 The actual Bearer token sent back to the client on success. Tests may
253 override this with the "token" test parameter.
256 if token
is not None:
267 Trims the response JSON, if necessary, and logs it for later debugging.
276 js[
"_pad_"] = pad[:64] + f
"[...truncated from {len(pad)} bytes]"
279 self.log_message(
"sending JSON response: %s", resp)
283 assert len(resp) < 1024,
"_log_response must be adjusted for new JSON"
287 Sends the provided JSON dict as an application/json response.
288 self._response_code can be modified to send JSON error responses.
295 self.send_header(
"Content-Length",
str(
len(resp)))
298 self.wfile.
write(resp)
307 issuer = f
"https://127.0.0.1:{port}"
309 issuer +=
"/alternate"
315 "token_endpoint": issuer +
"/token",
316 "device_authorization_endpoint": issuer +
"/authorize",
317 "response_types_supported": [
"token"],
318 "subject_types_supported": [
"public"],
319 "id_token_signing_alg_values_supported": [
"RS256"],
320 "grant_types_supported": [
321 "authorization_code",
322 "urn:ietf:params:oauth:grant-type:device_code",
329 A cached _TokenState object for the connected client (as determined by
330 the request's client_id), or a new one if it doesn't already exist.
332 This relies on the existence of a defaultdict attached to the server;
335 return self.server.token_state[self.
client_id]
339 Removes any cached _TokenState for the current client_id. Call this
340 after the token exchange ends to get rid of unnecessary state.
342 if self.
client_id in self.server.token_state:
343 del self.server.token_state[self.
client_id]
346 uri =
"https://example.com/"
348 uri =
"https://example.org/"
351 "device_code":
"postgres",
352 "user_code":
"postgresuser",
359 if interval
is not None:
360 resp[
"interval"] = interval
367 assert self.
_params[
"scope"][0],
"empty scopes should be omitted"
376 resp = {
"error": err}
380 resp[
"error_description"] = desc
393 ), f
"client waited only {delay} seconds between token requests (expected {self._token_state.min_delay})"
410 "token_type":
"bearer",
417 Starts the authorization server on localhost. The ephemeral port in use will
418 be printed to stdout.
455if __name__ ==
"__main__":
void print(const void *obj)
Dict[str, str] _parse_params(self)
JsonObject authorization(self)
None _send_json(self, JsonObject js)
None _log_response(self, JsonObject js)
_remove_token_state(self)
_get_param(self, name, default)
bool _should_modify(self)
static struct cvec * range(struct vars *v, chr a, chr b, int cases)