Source code for comotion.auth

from http.server import BaseHTTPRequestHandler, HTTPServer

import base64
import hashlib
import json
import os
import re
import requests
import webbrowser
import jwt

from abc import ABC, abstractmethod

from urllib.parse import urlparse, parse_qs, urlencode
import uuid


[docs]class OIDCredirectHandler(BaseHTTPRequestHandler):
[docs] def setup(self): self.code = None self.error = None self.errorDescription = None self.state = None super(BaseHTTPRequestHandler, self).setup() global latest_handled_request latest_handled_request = self
[docs] def log_message(self, format, *args): """ Overrider log_message to prevent from logging of requests """ pass
def _read_query_parameters(self, query_string): if query_string == '': return parsed_qs = parse_qs(query_string) if 'code' in parsed_qs: self.code = parsed_qs['code'] if 'error' in parsed_qs: self.error = parsed_qs['error'] if 'error_description' in parsed_qs: self.errorDescription = parsed_qs['error_description'] if 'state' in parsed_qs: self.state = parsed_qs['state'] def _process_code(self): payload = { "grant_type": "authorization_code", "code": self.code, "redirect_uri": "http://localhost:%s" % (self.server.server_port), "client_id": "comotion_cli", "code_verifier": self.server.pkce.get_code_verifier() } response = requests.post( self.server.como_authenticator.token_endpoint, data=payload ) try: json_response = json.loads(response.text) except json.JSONDecodeError: self.error = "Trouble getting your security tokens: " + response.text return if response.status_code != 200: if "error" in json_response and "error_description" in json_response: self.error = json_response["error_description"] else: self.error = "Could not get a token for you." return self.id_token = json_response['id_token'] self.id_token_decoded = jwt.decode( self.id_token, options={"verify_signature": False} ) try: self.server.como_authenticator.credentials_cache.set_refresh_token( self.id_token_decoded['preferred_username'], json.loads(str(response.text))['refresh_token']) except CredentialsCacheException as e: self.error = f"There was a problem saving your credentials: {str(e)}" # Handler for the redirect from keycloak
[docs] def do_GET(self): self._read_query_parameters(urlparse(self.path).query) self._process_code() self.send_response(200) self.end_headers() if self.error is not None: myheader = "There was a problem authenticating you" issuer_message = self.error final_message = "If the issue persists, please get in touch with support" else: myheader = "Authentication complete for " + self.id_token_decoded['preferred_username'] issuer_message = f"Authenticated at: {self.id_token_decoded['iss']}" final_message = f"Go ahead and close this window, your keys are saved in your computer's credentials manager." message=""" <html> <head> <title>Comotion Auth</title> </head> <body> <span style="font-family:'Courier New'"> <br/> <center> <h1> Comotion Auth </h1> <h2> {myheader} </h2> {issuer_message} <br/> <br/> {final_message} </center> </span> </body> <html> """.format(myheader=myheader, issuer_message=issuer_message, final_message=final_message) self.wfile.write(bytes(message,"utf-8")) return
[docs]class OIDCServer(HTTPServer): def __init__(self, como_authenticator, pkce): self.pkce = pkce self.como_authenticator = como_authenticator super(HTTPServer, self).__init__(('', 0), OIDCredirectHandler)
[docs]class PKCE(): PKCE_CODE_VERIFIER_MAX_LENGTH = 40 # https://tools.ietf.org/html/rfc7636#section-4.1 def __init__(self, code_challenge, code_verifier): self.code_challenge = code_challenge self.code_verifier = code_verifier
[docs] def get_code_challenge(self): return self.code_challenge
[docs] def get_code_verifier(self): return self.code_verifier
[docs] @staticmethod def generate_pkce(): code_verifier = base64\ .urlsafe_b64encode(os.urandom(PKCE.PKCE_CODE_VERIFIER_MAX_LENGTH))\ .decode('utf-8') code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) code_challenge = hashlib\ .sha256(code_verifier.encode('utf-8'))\ .digest() code_challenge = base64\ .urlsafe_b64encode(code_challenge).\ decode('utf-8') code_challenge = code_challenge\ .replace('=', '') return PKCE(code_challenge, code_verifier)
[docs]class CredentialsCacheException(Exception): """ CredentialsCacheException Thrown in credential save and retrieve in classes implementing CredentialsCacheInterface """ pass
[docs]class CredentialsCacheInterface(): """ Interfaced to be implemented in concrete class that executes caching of credentials for users Any errors in the functions should raise a CredentialsCacheException """ def __init__(self, issuer, orgname): self.issuer = issuer self.orgname = orgname
[docs] def get_current_user(self): """ Get latest authenticated user for the issuer and orgname Returns: str: preferred_username """ pass
[docs] def get_offline_token(self): """ Get offline token for the current user from cache """ pass
[docs] def set_offline_token(self, username, token): """ Set offline token for current user and update current user """ pass
[docs]class KeyringCredentialCache(CredentialsCacheInterface): """ Credential cache using the python keyring class. Saves to local keyring available on linux, windows and macosx """ def _get_token_key(self): return "comotion auth api offline token (%s/auth/realms/%s)" % (self.issuer, self.orgname) # noqa: E501 def _get_username_key(self): return 'comotion auth api latest username (%s)' % (self.issuer)
[docs] def get_current_user(self): import keyring return keyring.get_password( self._get_username_key(), self.orgname )
[docs] def get_refresh_token(self): import keyring token_key = self._get_token_key() return keyring.get_password( token_key, self.get_current_user() )
[docs] def set_refresh_token(self, username, token): import keyring try: # save latest username keyring.set_password( self._get_username_key(), self.orgname, username ) keyring.set_password( self._get_token_key(), username, token ) except keyring.errors.KeyringError as e: raise CredentialsCacheException(e)
[docs]class AuthException(Exception): """ Exception thrown by Auth class """ pass
[docs]class UnAuthenticatedException(AuthException): """ Exception thrown when credentials are not valid. """ pass
[docs]class Auth(): """ Class that authenticates the user or application, caches credentials. Args: orgname (str): The name of the organization. issuer (str): The issuer URL for authentication. Defaults to 'https://auth.comotion.us'. credentials_cache_class (class): The class used for credential caching. Defaults to KeyringCredentialCache. entity_type (str): The type of entity being authenticated (Auth.USER or Auth.APPLICATION). Defaults to Auth.USER. application_client_id (str, optional): The client ID for the application on auth.comotion.us. When entity_type is Auth.USER, defaults to `comotion_cli` application_client_secret (str, optional): The client secret for the application on auth.comotion.us. Only valid when entity_type is Auth.APPLICATION. """ USER='user' """ Constant for user entity type. """ APPLICATION='application' """ Constant for application entity type. """ def __init__(self, orgname, issuer='https://auth.comotion.us', credentials_cache_class=None, entity_type=None, application_client_id=None, application_client_secret=None ): if credentials_cache_class is None: credentials_cache_class = KeyringCredentialCache if entity_type is None or entity_type==Auth.USER: entity_type = Auth.USER if application_client_secret is not None: raise ValueError("when entity_type is Auth.Application, application_client_id and application_client_secret must not be provided") if application_client_id is None: self.application_client_id = "comotion_cli" # default to comotion_cli if Auth.Application elif entity_type==Auth.APPLICATION: if application_client_id is None or application_client_secret is None: raise ValueError("when entity_type is Auth.Application, application_client_id and application_client_secret must be provided") self.application_client_id = application_client_id self.application_client_secret = application_client_secret else: raise ValueError("entity_type must be either Auth.USER or Auth.APPLICATION") self.issuer = issuer self.orgname = orgname self.entity_type = entity_type self.auth_endpoint = "%s/auth/realms/%s/protocol/openid-connect/auth" % (issuer,orgname) # noqa self.token_endpoint = "%s/auth/realms/%s/protocol/openid-connect/token" % (issuer,orgname) # noqa self.logout_endpoint = "%s/auth/realms/%s/protocol/openid-connect/logout" % (issuer,orgname) # noqa self.delegated_endpoint = "%s/auth/realms/%s/account" % (issuer,orgname) # noqa self.refresh_token = None self.credentials_cache = credentials_cache_class(issuer, orgname) def _build_auth_url(self, redirect_uri, state, pkce): query_params = { "response_type": "code", "client_id": "comotion_cli", "redirect_uri": redirect_uri, "scope": "openid", "state": state, "code_challenge": pkce.get_code_challenge(), "code_challenge_method": "S256" } query_params = "?" + urlencode(query_params) return self.auth_endpoint + query_params
[docs] def authenticate(self): """ used by the CLI to run a user authentication process using the auth code flow with auth.comotion.us Note that the handle_request function (part of the OIDCServer class) saves the key to an appropriate key manager """ import logging logging.captureWarnings(True) logging.getLogger().setLevel(logging.DEBUG) try: state = str(uuid.uuid4()) pkce = PKCE.generate_pkce() # Create a web server and define the handler to manage the incoming request server = OIDCServer( self, pkce ) redirect_server_address = "http://localhost:%s" % (server.server_port) # noqa auth_url = self._build_auth_url( redirect_server_address, state, pkce) server.timeout = 180 webbrowser.open(auth_url) # Wait for timeout for one http request server.handle_request() server.server_close() finally: server.socket.close()
[docs] def get_access_token(self): """ Retrieve an access token from the auth provider. This method interacts with the authentication provider to retrieve an access token. The access token is not cached and must be retrieved each time this method is called. The method handles both user and application entity types, using the appropriate authentication mechanism for each. Returns: str: The access token retrieved from the auth provider. Raises: UnAuthenticatedException: If there is an error retrieving the access token from the auth provider. """ try: if self.entity_type == Auth.USER: refresh_token = self.credentials_cache.get_refresh_token() payload = { "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": self.application_client_id } else: payload = { "grant_type": "client_credentials", "client_id": self.application_client_id, "client_secret": self.application_client_secret } response = requests.post( self.token_endpoint, data=payload ) if response.status_code == requests.codes.ok: return json.loads(str(response.text))['access_token'] else: json_response = json.loads(str(response.text)) if 'error' in json_response: if json_response['error'] == 'invalid_grant': raise UnAuthenticatedException("Your credentials are not valid. Run `comotion authenticate` to refresh your credentials.") else: raise UnAuthenticatedException("There was a problem with the request: " + json_response.get('error_description', "unknown system error. This is what the system is returning: " + response.text) + f" ({json_response['error']})") else: raise UnAuthenticatedException("unknown system error. This is what the system is returning: " + response.text) except json.JSONDecodeError as e: raise UnAuthenticatedException(f"There was a strange response from the server: '{response.text}' ({e.msg})") except requests.RequestException as e: raise UnAuthenticatedException(f"Request failed: {e}")