16from collections
import defaultdict
21 Core implementation of the authorization server. The API is
22 inheritance-based,
with entry points at
do_GET()
and do_POST(). See the
23 documentation
for BaseHTTPRequestHandler.
26 JsonObject = dict[str, object]
30 Switches the behavior of the provider depending on the issuer URI.
33 self.path.startswith("/alternate/")
34 or self.
path ==
"/.well-known/oauth-authorization-server/alternate"
40 if self.
path.startswith(
"/.well-known/"):
41 self.
path = self.
path.removesuffix(
"/alternate")
43 self.
path = self.
path.removeprefix(
"/alternate")
45 self.
path = self.
path.removeprefix(
"/param")
49 Checks the expected value of the Authorization header, if any.
51 secret = self._get_param("expected_secret",
None)
55 assert "Authorization" in self.headers
56 method, creds = self.headers[
"Authorization"].split()
59 raise RuntimeError(f
"client used {method} auth; expected Basic")
61 username = urllib.parse.quote_plus(self.
client_id)
62 password = urllib.parse.quote_plus(secret)
63 expected_creds = f
"{username}:{password}"
65 if creds.encode() != base64.b64encode(expected_creds.encode()):
67 f
"client sent '{creds}'; expected b64encode('{expected_creds}')"
74 config_path =
"/.well-known/openid-configuration"
76 config_path =
"/.well-known/oauth-authorization-server"
78 if self.
path == config_path:
81 self.send_error(404,
"Not Found")
88 Parses apart the form-urlencoded request body and returns the resulting
91 size = int(self.headers["Content-Length"])
92 form = self.rfile.
read(size)
94 assert self.headers[
"Content-Type"] ==
"application/x-www-form-urlencoded"
95 return urllib.parse.parse_qs(
98 keep_blank_values=
True,
106 Returns the client_id sent in the POST body
or the Authorization header.
109 if "client_id" in self.
_params:
110 return self.
_params[
"client_id"][0]
112 if "Authorization" not in self.headers:
113 raise RuntimeError(
"client did not send any client_id")
115 _, creds = self.headers[
"Authorization"].split()
117 decoded = base64.b64decode(creds).decode(
"utf-8")
118 username, _ = decoded.split(
":", 1)
120 return urllib.parse.unquote_plus(username)
135 if self.
path ==
"/authorize":
137 elif self.
path ==
"/token":
147 Returns True if the client has requested a modification to this stage of
150 if not hasattr(self,
"_test_params"):
159 and self.
path ==
"/.well-known/openid-configuration"
161 or (stage ==
"device" and self.
path ==
"/authorize")
162 or (stage ==
"token" and self.
path ==
"/token")
167 If the client has requested a modification to this stage (see
168 _should_modify()), this method searches the provided test parameters for
169 a key of the given name,
and returns it
if found. Otherwise the provided
180 Returns "application/json" unless the test has requested something
183 return self.
_get_param(
"content_type",
"application/json")
188 Returns 0 unless the test has requested something different.
195 Returns "authorization_pending" unless the test has requested something
198 return self.
_get_param(
"retry_code",
"authorization_pending")
203 Returns "verification_uri" unless the test has requested something
206 return self.
_get_param(
"uri_spelling",
"verification_uri")
211 If the huge_response test parameter is set to
True, returns a dict
212 containing a gigantic string value, which can then be folded into a JSON
215 if not self.
_get_param(
"huge_response",
False):
218 return {
"_pad_":
"x" * 1024 * 1024}
223 The actual Bearer token sent back to the client on success. Tests may
224 override this with the
"token" test parameter.
227 if token
is not None:
238 Sends the provided JSON dict as an application/json response.
241 resp = json.dumps(js).encode("ascii")
242 self.log_message(
"sending JSON response: %s", resp)
246 self.send_header(
"Content-Length",
str(
len(resp)))
249 self.wfile.
write(resp)
252 port = self.server.socket.getsockname()[1]
254 issuer = f
"http://localhost:{port}"
256 issuer +=
"/alternate"
262 "token_endpoint": issuer +
"/token",
263 "device_authorization_endpoint": issuer +
"/authorize",
264 "response_types_supported": [
"token"],
265 "subject_types_supported": [
"public"],
266 "id_token_signing_alg_values_supported": [
"RS256"],
267 "grant_types_supported": [
268 "authorization_code",
269 "urn:ietf:params:oauth:grant-type:device_code",
276 A cached _TokenState object for the connected client (
as determined by
277 the request
's client_id), or a new one if it doesn't already exist.
279 This relies on the existence of a defaultdict attached to the server;
282 return self.server.token_state[self.
client_id]
286 Removes any cached _TokenState for the current client_id. Call this
287 after the token exchange ends to get rid of unnecessary state.
289 if self.
client_id in self.server.token_state:
290 del self.server.token_state[self.
client_id]
293 uri =
"https://example.com/"
295 uri =
"https://example.org/"
298 "device_code":
"postgres",
299 "user_code":
"postgresuser",
306 if interval
is not None:
307 resp[
"interval"] = interval
314 assert self.
_params[
"scope"][0],
"empty scopes should be omitted"
319 if err := self.
_get_param(
"error_code",
None):
322 resp = {
"error": err}
324 resp[
"error_description"] = desc
332 now = time.monotonic()
337 ), f
"client waited only {delay} seconds between token requests (expected {self._token_state.min_delay})"
354 "token_type":
"bearer",
361 Starts the authorization server on localhost. The ephemeral port in use will
362 be printed to stdout.
365 s = http.server.HTTPServer(("127.0.0.1", 0), OAuthHandler)
375 s.token_state = defaultdict(_TokenState)
379 port = s.socket.getsockname()[1]
383 stdout = sys.stdout.fileno()
390if __name__ ==
"__main__":
void print(const void *obj)
JsonObject authorization(self)
None _send_json(self, JsonObject js)
def _remove_token_state(self)
def _get_param(self, name, default)
def _response_padding(self)
bool _should_modify(self)
dict[str, str] _parse_params(self)