16from collections
import defaultdict
17from typing
import Dict
22 Core implementation of the authorization server. The API is
23 inheritance-based,
with entry points at
do_GET()
and do_POST(). See the
24 documentation
for BaseHTTPRequestHandler.
27 JsonObject = Dict[str, object]
31 Switches the behavior of the provider depending on the issuer URI.
34 self.path.startswith("/alternate/")
35 or self.
path ==
"/.well-known/oauth-authorization-server/alternate"
43 if self.
path.startswith(
"/.well-known/"):
52 Checks the expected value of the Authorization header, if any.
54 secret = self._get_param("expected_secret",
None)
58 assert "Authorization" in self.headers
59 method, creds = self.headers[
"Authorization"].split()
62 raise RuntimeError(f
"client used {method} auth; expected Basic")
66 username = urllib.parse.quote_plus(self.
client_id, safe=
"~")
67 password = urllib.parse.quote_plus(secret, safe=
"~")
68 expected_creds = f
"{username}:{password}"
70 if creds.encode() != base64.b64encode(expected_creds.encode()):
72 f
"client sent '{creds}'; expected b64encode('{expected_creds}')"
79 config_path =
"/.well-known/openid-configuration"
81 config_path =
"/.well-known/oauth-authorization-server"
83 if self.
path == config_path:
86 self.send_error(404,
"Not Found")
93 Parses apart the form-urlencoded request body and returns the resulting
96 size = int(self.headers["Content-Length"])
97 form = self.rfile.
read(size)
99 assert self.headers[
"Content-Type"] ==
"application/x-www-form-urlencoded"
100 return urllib.parse.parse_qs(
101 form.decode(
"utf-8"),
103 keep_blank_values=
True,
111 Returns the client_id sent in the POST body
or the Authorization header.
114 if "client_id" in self.
_params:
115 return self.
_params[
"client_id"][0]
117 if "Authorization" not in self.headers:
118 raise RuntimeError(
"client did not send any client_id")
120 _, creds = self.headers[
"Authorization"].split()
122 decoded = base64.b64decode(creds).decode(
"utf-8")
123 username, _ = decoded.split(
":", 1)
125 return urllib.parse.unquote_plus(username)
140 if self.
path ==
"/authorize":
142 elif self.
path ==
"/token":
152 Returns True if the client has requested a modification to this stage of
155 if not hasattr(self,
"_test_params"):
164 and self.
path ==
"/.well-known/openid-configuration"
166 or (stage ==
"device" and self.
path ==
"/authorize")
167 or (stage ==
"token" and self.
path ==
"/token")
172 If the client has requested a modification to this stage (see
173 _should_modify()), this method searches the provided test parameters for
174 a key of the given name,
and returns it
if found. Otherwise the provided
185 Returns "application/json" unless the test has requested something
188 return self.
_get_param(
"content_type",
"application/json")
193 Returns 0 unless the test has requested something different.
200 Returns "authorization_pending" unless the test has requested something
203 return self.
_get_param(
"retry_code",
"authorization_pending")
208 Returns "verification_uri" unless the test has requested something
211 return self.
_get_param(
"uri_spelling",
"verification_uri")
216 If the huge_response test parameter is set to
True, returns a dict
217 containing a gigantic string value, which can then be folded into a JSON
220 if not self.
_get_param(
"huge_response",
False):
223 return {
"_pad_":
"x" * 1024 * 1024}
228 The actual Bearer token sent back to the client on success. Tests may
229 override this with the
"token" test parameter.
232 if token
is not None:
243 Sends the provided JSON dict as an application/json response.
246 resp = json.dumps(js).encode("ascii")
247 self.log_message(
"sending JSON response: %s", resp)
251 self.send_header(
"Content-Length",
str(
len(resp)))
254 self.wfile.
write(resp)
257 port = self.server.socket.getsockname()[1]
259 issuer = f
"http://127.0.0.1:{port}"
261 issuer +=
"/alternate"
267 "token_endpoint": issuer +
"/token",
268 "device_authorization_endpoint": issuer +
"/authorize",
269 "response_types_supported": [
"token"],
270 "subject_types_supported": [
"public"],
271 "id_token_signing_alg_values_supported": [
"RS256"],
272 "grant_types_supported": [
273 "authorization_code",
274 "urn:ietf:params:oauth:grant-type:device_code",
281 A cached _TokenState object for the connected client (
as determined by
282 the request
's client_id), or a new one if it doesn't already exist.
284 This relies on the existence of a defaultdict attached to the server;
287 return self.server.token_state[self.
client_id]
291 Removes any cached _TokenState for the current client_id. Call this
292 after the token exchange ends to get rid of unnecessary state.
294 if self.
client_id in self.server.token_state:
295 del self.server.token_state[self.
client_id]
298 uri =
"https://example.com/"
300 uri =
"https://example.org/"
303 "device_code":
"postgres",
304 "user_code":
"postgresuser",
311 if interval
is not None:
312 resp[
"interval"] = interval
319 assert self.
_params[
"scope"][0],
"empty scopes should be omitted"
328 resp = {
"error": err}
332 resp[
"error_description"] = desc
340 now = time.monotonic()
345 ), f
"client waited only {delay} seconds between token requests (expected {self._token_state.min_delay})"
362 "token_type":
"bearer",
369 Starts the authorization server on localhost. The ephemeral port in use will
370 be printed to stdout.
373 s = http.server.HTTPServer(("127.0.0.1", 0), OAuthHandler)
383 s.token_state = defaultdict(_TokenState)
387 port = s.socket.getsockname()[1]
391 stdout = sys.stdout.fileno()
398if __name__ ==
"__main__":
void print(const void *obj)
Dict[str, str] _parse_params(self)
JsonObject authorization(self)
None _send_json(self, JsonObject js)
def _remove_token_state(self)
def _get_param(self, name, default)
def _response_padding(self)
bool _should_modify(self)