Source code for exos.api.aaa

# ******************************************************
#
#  Copyright (c) Extreme Networks Inc. 2014, 2021
#  All rights reserved
#
# ******************************************************

import threading
import time
from exos.api import lm, exsys, util
import _exos_ext_aaa as ext


# List of open sessions, mapped from cookie to user.
_SESSIONS = {}


class AaaLibraryError(exsys.ExosError):
    """Base class for AAA errors."""

    pass


[docs]class AaaSessionExistsError(AaaLibraryError): """The session already exists when it shouldn't.""" pass
[docs]class AaaSessionDoesNotExistError(AaaLibraryError): """The session does not exist when it should.""" pass
def _session_callback(event, cookie): if event == ext.SM_CLEAR_SESSION: if cookie in _SESSIONS: user = _SESSIONS[cookie] user.end_session() else: # Ignored other event types pass # Decorator that ensures the AAA library is ready before proceeding with the # request. class AaaReady(lm.LmLibraryWatchBase): def get_libraries(self): return [lm.AAA_LIB_NAME] def do_init(self): ext.aaa_init(False) # Decorator that ensures the SM library is ready before proceeding with the # request. class SmReady(lm.LmLibraryWatchBase): def get_libraries(self): return [lm.SM_LIB_NAME] def do_init(self): ext.sm_init(_session_callback)
[docs]class ExosUser(object): """An authenticated EXOS user. Instances of this class should be obtained via :func:`authenticate_user`, :func:`authenticate_user_async`, or :func:`validate_session`. Do not create instances of this class directly. """ def __init__(self, username, src_addr=None): #: The user's name. self.username = username #: True if the user has read/write privileges self.read_write = False #: The user's session cookie, if a session has been started. self.session_cookie = None #: The time, as returned by :func:`time.time()`, the session was started. self.session_start = 0 # For now, auth_by is private. There is little reason to expose it to # the user, but it's needed by createSession(). self._auth_by = ext.AAA_LOCAL_DB #: The user's source IP address self.src_addr = src_addr
[docs] @SmReady def start_session(self, src_addr=None, session_type=None): """ start_session(src_addr) Start a session for this user. *src_addr* must be a string in IPv4 or IPv6 format. The new session's cookie is returned. If this user already has a session, :class:`AaaSessionExistsError` is raised.""" if self.session_cookie: raise AaaSessionExistsError() else: # We need to create a new session import random cookie = "{0:016X}".format(random.randint(0, 0xFFFFFFFFFFFFFFFF)) if not src_addr: src_addr = self.src_addr if not src_addr: raise TypeError("src_addr cannot be None") ret = ext.start_session(self.username, src_addr, self._auth_by, cookie, session_type) if ret != ext.SM_SUCCESS: raise AaaLibraryError("Unable to start session, ret={0:d}".format(ret)) self.session_cookie = cookie self.session_start = time.time() _SESSIONS[self.session_cookie] = self return cookie
[docs] @SmReady def end_session(self): """ end_session() End a session for this user. If this user does not have a session, :class:`AaaSessionDoesNotExistError` is raised.""" if self.session_cookie: del _SESSIONS[self.session_cookie] ext.end_session(self.session_cookie) self.session_cookie = None else: raise AaaSessionDoesNotExistError()
# Undecorated utility def _authenticate_async(callback, username, password, src_addr=None): def ext_callback(result, read_write, auth_by): if result != ext.AAA_AUTH_PASSED: callback(None) else: user = ExosUser(username, src_addr) user._auth_by = auth_by if read_write: user.read_write = True callback(user) if src_addr: from ipaddress import ip_address if ":" not in src_addr: # This is an IPv4 address. Convert to IPv6 mapped src_addr = "::ffff:{0}".format(src_addr) if "%" in src_addr: # This address happens to have IPv6 zone. Remove it. src_addr = src_addr.split("%")[0] try: # Check to make sure it is a valid IP address src_addr = str(ip_address(src_addr)) src_addr = str(src_addr) except Exception: # Not a valid IP address # Debug log this? src_addr = None ret = ext.authenticate(ext_callback, username, password, src_addr) if ret != ext.AAA_SUCCESS: raise AaaLibraryError("Internal error {:s}".format(ret))
[docs]@AaaReady @util.ensure_synchronous def authenticate_user(username, password, src_addr=None): """ authenticate_user(username, password, src_addr) Authenticate a user, given a *username* and *password*. An :class:`ExosUser` is returned if successful or ``None`` if not. """ ready_lock = threading.Lock() ready_lock.acquire() retval = [] def callback(result): retval.append(result) ready_lock.release() _authenticate_async(callback, username, password, src_addr) ready_lock.acquire() return retval[0]
[docs]@AaaReady def authenticate_user_async(callback, username, password, src_addr=None): """ authenticate_user_async(callback, username, password, src_addr) Asynchronous version of :func:`authenticate_user`. Returns immediately and calls *callback* with the result. *callback* must accept one argument, which is an :class:`ExosUser` if successful or ``None`` if failed. """ _authenticate_async(callback, username, password, src_addr)
[docs]def validate_session(session_cookie): """ validate_session(session_cookie) Given a *session_cookie*, find the session and return the associated :class:`ExosUser` or ``None`` if the session cannot be found. """ try: return _SESSIONS[session_cookie] except KeyError: # Unknown session return None
[docs]def get_sessions(): """ get_sessions() Return the list of active sessions. """ return _SESSIONS.values()