# ******************************************************
#
# Copyright (c) Extreme Networks Inc. 2014, 2021
# All rights reserved
#
# ******************************************************
import threading
import time
from exos.api import lm, exsys, util
import _exos_ext_aaa as ext
# List of open sessions, mapped from cookie to user.
_SESSIONS = {}
class AaaLibraryError(exsys.ExosError):
"""Base class for AAA errors."""
pass
[docs]class AaaSessionExistsError(AaaLibraryError):
"""The session already exists when it shouldn't."""
pass
[docs]class AaaSessionDoesNotExistError(AaaLibraryError):
"""The session does not exist when it should."""
pass
def _session_callback(event, cookie):
if event == ext.SM_CLEAR_SESSION:
if cookie in _SESSIONS:
user = _SESSIONS[cookie]
user.end_session()
else:
# Ignored other event types
pass
# Decorator that ensures the AAA library is ready before proceeding with the
# request.
class AaaReady(lm.LmLibraryWatchBase):
def get_libraries(self):
return [lm.AAA_LIB_NAME]
def do_init(self):
ext.aaa_init(False)
# Decorator that ensures the SM library is ready before proceeding with the
# request.
class SmReady(lm.LmLibraryWatchBase):
def get_libraries(self):
return [lm.SM_LIB_NAME]
def do_init(self):
ext.sm_init(_session_callback)
[docs]class ExosUser(object):
"""An authenticated EXOS user. Instances of this class should be obtained
via :func:`authenticate_user`, :func:`authenticate_user_async`, or
:func:`validate_session`. Do not create instances of this class directly.
"""
def __init__(self, username, src_addr=None):
#: The user's name.
self.username = username
#: True if the user has read/write privileges
self.read_write = False
#: The user's session cookie, if a session has been started.
self.session_cookie = None
#: The time, as returned by :func:`time.time()`, the session was started.
self.session_start = 0
# For now, auth_by is private. There is little reason to expose it to
# the user, but it's needed by createSession().
self._auth_by = ext.AAA_LOCAL_DB
#: The user's source IP address
self.src_addr = src_addr
[docs] @SmReady
def start_session(self, src_addr=None, session_type=None):
"""
start_session(src_addr)
Start a session for this user. *src_addr* must be a string in
IPv4 or IPv6 format. The new session's cookie is returned. If this
user already has a session, :class:`AaaSessionExistsError` is raised."""
if self.session_cookie:
raise AaaSessionExistsError()
else:
# We need to create a new session
import random
cookie = "{0:016X}".format(random.randint(0, 0xFFFFFFFFFFFFFFFF))
if not src_addr:
src_addr = self.src_addr
if not src_addr:
raise TypeError("src_addr cannot be None")
ret = ext.start_session(self.username, src_addr, self._auth_by, cookie, session_type)
if ret != ext.SM_SUCCESS:
raise AaaLibraryError("Unable to start session, ret={0:d}".format(ret))
self.session_cookie = cookie
self.session_start = time.time()
_SESSIONS[self.session_cookie] = self
return cookie
[docs] @SmReady
def end_session(self):
"""
end_session()
End a session for this user. If this user does not have a session,
:class:`AaaSessionDoesNotExistError` is raised."""
if self.session_cookie:
del _SESSIONS[self.session_cookie]
ext.end_session(self.session_cookie)
self.session_cookie = None
else:
raise AaaSessionDoesNotExistError()
# Undecorated utility
def _authenticate_async(callback, username, password, src_addr=None):
def ext_callback(result, read_write, auth_by):
if result != ext.AAA_AUTH_PASSED:
callback(None)
else:
user = ExosUser(username, src_addr)
user._auth_by = auth_by
if read_write:
user.read_write = True
callback(user)
if src_addr:
from ipaddress import ip_address
if ":" not in src_addr:
# This is an IPv4 address. Convert to IPv6 mapped
src_addr = "::ffff:{0}".format(src_addr)
if "%" in src_addr:
# This address happens to have IPv6 zone. Remove it.
src_addr = src_addr.split("%")[0]
try:
# Check to make sure it is a valid IP address
src_addr = str(ip_address(src_addr))
src_addr = str(src_addr)
except Exception:
# Not a valid IP address
# Debug log this?
src_addr = None
ret = ext.authenticate(ext_callback, username, password, src_addr)
if ret != ext.AAA_SUCCESS:
raise AaaLibraryError("Internal error {:s}".format(ret))
[docs]@AaaReady
@util.ensure_synchronous
def authenticate_user(username, password, src_addr=None):
"""
authenticate_user(username, password, src_addr)
Authenticate a user, given a *username* and *password*.
An :class:`ExosUser` is returned if successful or ``None`` if not.
"""
ready_lock = threading.Lock()
ready_lock.acquire()
retval = []
def callback(result):
retval.append(result)
ready_lock.release()
_authenticate_async(callback, username, password, src_addr)
ready_lock.acquire()
return retval[0]
[docs]@AaaReady
def authenticate_user_async(callback, username, password, src_addr=None):
"""
authenticate_user_async(callback, username, password, src_addr)
Asynchronous version of :func:`authenticate_user`. Returns immediately
and calls *callback* with the result. *callback* must accept one argument,
which is an :class:`ExosUser` if successful or ``None`` if failed.
"""
_authenticate_async(callback, username, password, src_addr)
[docs]def validate_session(session_cookie):
"""
validate_session(session_cookie)
Given a *session_cookie*, find the session and return the associated :class:`ExosUser`
or ``None`` if the session cannot be found.
"""
try:
return _SESSIONS[session_cookie]
except KeyError:
# Unknown session
return None
[docs]def get_sessions():
"""
get_sessions()
Return the list of active sessions.
"""
return _SESSIONS.values()