# ******************************************************
#
# Copyright (c) Extreme Networks Inc. 2014, 2021
# All rights reserved
#
# ******************************************************
# Non-public utilities. Things that are useful when implementing the API.
import functools
import sys
import threading
import logging
import Queue
from concurrent import futures
from _exos_ext_exsys import ExosError
from _exos_ext_dm import get_port_string, get_platform_has_slots
logger = logging.getLogger("pyapi")
#: Default thread pool for executing callbacks. This gets us off the
# dispatcher thread quickly and safely. The thread will not be created until
# something uses it.
_callback_pool = futures.ThreadPoolExecutor(max_workers=1)
class Enum(set):
"""Simple enum implementation, stolen from:
http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
"""
#: Default mapping into this enum. Used by map_enum().
_mapping = {}
def __getattr__(self, name):
if name in self:
return name
raise AttributeError
def map_enum(self, val, default, mapping=None):
"""Utility to map a value *val* into this enum, using the given
*mapping*. If *mapping* is None, cls._mapping is used. If val is not in
the mapping, *default* is returned.
"""
if mapping is None:
mapping = self._mapping
if val not in mapping:
return default
return mapping[val]
def map_val(self, enum_attr, default, mapping=None):
"""Utility to map an *enum attr* to a value, using the given
*mapping*. This is the inverse of map_enum. If *mapping* is None, cls._mapping
is used. If *enum_attr* is not a valid attribute of this enum or is not
in the mapping, *default* is returned.
"""
if enum_attr not in self:
return default
if mapping is None:
mapping = self._mapping
for val, attr in mapping.items():
if attr == enum_attr:
return val
return default
def memoized_property(fget):
"""Call fget once and cache the results."""
attr = "_{0}".format(fget.__name__)
@functools.wraps(fget)
def fget_cached(self):
if not hasattr(self, attr):
setattr(self, attr, fget(self))
return getattr(self, attr)
return property(fget_cached)
[docs]class DeadlockDetectedError(ExosError):
"""An imminent deadlock was detected, typically because a synchronous call
was made from an asynchronous context."""
def ensure_synchronous(f):
"""Internal decorator to ensure a method is being called from a synchronous context.
If the call is made on the dispatcher thread, then we could enter a deadlock.
"""
@functools.wraps(f)
def check_dispatcher_thread(*args, **kwargs):
if sys.pyext_dispatcher == threading.currentThread():
raise DeadlockDetectedError(
"A synchronous call was made from an asynchronous context, resulting "
"in a deadlock. Be sure to make only asynchronous calls from a callbacks."
)
return f(*args, **kwargs)
return check_dispatcher_thread
class SlotPort(tuple):
"""Represent a slot:port on the running switch. If the running switch does
not support slots, then the slot will be ignored.
SlotPort can be created with just a port, a slot and a port, or with an
integer in slotPort format, which is a 32-bit integer with the slot in the
top 16 bits and port in the lower 16 bits.
"""
@classmethod
def has_slots(cls):
"""Return True if the running switch supports slots."""
return bool(get_platform_has_slots())
def __new__(cls, a, b=None):
if b is None:
if a > 0xFFFF:
slot = (a & 0xFFFF0000) >> 16
port = a & 0x0000FFFF
else:
slot = 1
port = a
else:
slot = a
port = b
return tuple.__new__(SlotPort, (slot, port))
@property
def slot(self):
return self[0]
@property
def port(self):
return self[1]
@property
def slotPort(self): # noqa
"""This SlotPort in EXOS slotPort format, which is a 32-bit integer
with the slot in the top 16 bits and port in the lower 16 bits.
"""
return ((self[0] << 16) & 0xFFFF0000) | (self[1] & 0x0000FFFF)
def __str__(self):
return get_port_string(self[0], self[1])
def __repr__(self):
return "SlotPort({},{})".format(self[0], self[1])
def get_slotPort_from_tuple(tup): # noqa
if not isinstance(tup, tuple):
# passed in (port)
slot = 1
port = tup
else:
# passed in (slot,port)
if len(tup) == 1:
slot = 1
else:
slot = tup[0]
port = tup[1]
return ((slot << 16) & 0xFFFF0000) | (port & 0x0000FFFF)
def get_tuple_from_slotPort(slotPort): # noqa
slot = (slotPort & 0xFFFF0000) >> 16
port = slotPort & 0x0000FFFF
if slot == 1:
return port
else:
return (slot, port)
########
# We forked obsub here.
# - all signature reference are invalid since that has been removed (its py3
# only).
# https://github.com/aepsil0n/obsub
##
class Observable(object):
"""
This class serves as a utility to decorate a function as an event.
The following example demonstrates its functionality in an abstract way.
A class method can be decorated as follows:
>>> class A(object):
... def __init__(self, name):
... self.name = name
...
... @event
... def progress(self, first, second):
... print("Doing something...")
A.progress is the event. It is triggered after executing the code in the
decorated progress routine.
Now that we have a class with some event, let's create an event handler.
>>> def handler(self, first, second):
... print("%s %s and %s!" % (first, self.name, second))
Note that the handler (and signal calls) must have the signature defined
by the decorated event method.
This handler only greets the object that triggered the event by using its
name attribute. Let's create some instances of A and register our new
event handler to their progress event.
>>> a = A("Foo")
>>> b = A("Bar")
>>> a.progress += handler
>>> b.progress += handler
Now everything has been setup. When we call the method, the event will be
triggered:
>>> a.progress("Hello", "World")
Doing something...
Hello Foo and World!
>>> b.progress(second="Others", first="Hi")
Doing something...
Hi Bar and Others!
What happens if we disobey the call signature?
>>> c = A("World")
>>> c.progress(second="World") # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
TypeError: progress() missing 1 required positional argument: 'first'
Class based access is possible as well:
>>> A.progress(a, "Hello", "Y")
Doing something...
Hello Foo and Y!
Bound methods keep the instance alive:
>>> f = a.progress
>>> import weakref, gc
>>> wr = weakref.ref(a)
>>> del a
>>> c=gc.collect()
>>> assert wr() is not None
>>> f("Hi", "Z")
Doing something...
Hi Foo and Z!
If we delete the hard reference to the bound method and run the garbage
collector (to make sure it is run at all), the object will be gone:
>>> del f
>>> c=gc.collect()
>>> assert wr() is None
"""
def __init__(self, decorator=None):
"""
Constructor.
* function -- The function to be wrapped by the decorator.
"""
if decorator:
# Copy docstring and other attributes from function
functools.update_wrapper(self, decorator)
# Used to enforce call signature even when no slot is connected.
# Can also execute code (called before handlers)
self._decorator = decorator
def __call__(self, function):
self.wrapper = Observed(function, decorator=self._decorator)
return self.wrapper
class Observed(object):
"""Private helper class for event system."""
def __init__(self, function, decorator=None):
"""
Constructor.
* instance -- the instance whose member the event is
"""
if decorator:
self.add_observer = decorator(self.add_observer)
self._function = function
self._key = "_" + function.__name__
# Wrap the class. This pulls up the doc strings
functools.update_wrapper(self, self._function, updated=())
@property
def _observers(self):
if self._key not in self.instance.__dict__:
self.instance.__dict__[self._key] = []
return self.instance.__dict__[self._key]
def add_observer(self, function):
self._observers.append(function)
def remove_observer(self, function):
"""Remove the function, if known, from the list of registered event
handlers."""
try:
self._observers.remove(function)
except ValueError:
pass
def __set__(self, instance, value):
pass
def __get__(self, instance, owner):
# this case corresponds to access via the owner class:
if instance is None:
@functools.wraps(self._function)
def wrapper(instance, *args, **kwargs):
return self.__get__(instance, owner)(*args, **kwargs)
else:
self.instance = instance
wrapper = functools.wraps(self._function)(self)
return wrapper
def __call__(self, *args, **kwargs):
"""
Calls the observable, then takes its return and passes it
to the observers. This allows us to modify the event attributes
dictionary before passing it on.
* *args -- Arguments given to the observable.
* **kwargs -- Keyword arguments passed to the observable.
"""
result = self._function(self.instance, *args, **kwargs)
# Call all registered event handlers
for f in self._observers:
try:
f(**result)
except Exception:
logger.warn("Unhandled exception in observer.", exc_info=True)
return result
class NotifyBase(object):
"""Abstract base class for Notify objects."""
def add_observers(self, observers):
"""Add the collection of *observers* to matching Observables in this
Notify class. For any method in observers, if we find an Observable
in this class of the same name, we'll add the method to the Observable.
"""
# Look for methods. (More precisely, callables. We're not picky.)
for name in dir(observers):
if name.startswith("_"):
continue
method = getattr(observers, name)
if not callable(method):
continue
try:
# No need for a partial as method is already bound.
self.add_observer(method, name)
except (AttributeError, TypeError):
# Ignore name mismatches. Assume the method is not for us.
logger.debug("ignoring callable without observable %s: %s", name, method)
def add_observer(self, observer, name):
"""Add the *observer* to the Observable in this class of the given *name*.
This is equivalent to
notify.name.add_observer(observer)
Throws AttributeError if *name* is unknown and TypeError if *name*
is not an Observable.
"""
# This will raise an AttributeError, if unknown.
observed = getattr(self, name)
if not isinstance(observed, Observed):
raise TypeError("Not an Observable")
# Attach the observer to the observed.
observed.add_observer(observer)
class DroppingPoolExecutor(futures.ThreadPoolExecutor):
def __init__(self, max_workers, max_queue_size):
super(DroppingPoolExecutor, self).__init__(max_workers)
self._work_queue = Queue.Queue(maxsize=max_queue_size)
def submit(self, *args, **kwds):
if not self._work_queue.full():
super(DroppingPoolExecutor, self).submit(*args, **kwds)
class SynchronousPoolExecutor(object):
"""A concurrent.futures Executor that executes its jobs synchronously,
meaning that it blocks until they are done and runs them in the current
thread. Useful when you want the Executer/Future model, but not the extra
thread."""
class SyncFuture(object):
def __init__(self):
self._res = None
self._ex = None
def cancelled(self):
return False
def done(self):
return True
def exception(self):
return self._ex
def result(self):
if self._ex:
raise self._ex, self._v, self._tb
return self._res
def add_done_callback(self, fn):
fn(self)
def submit(self, fn, *args, **kwds):
f = self.SyncFuture()
try:
f._res = fn(*args, **kwds)
except Exception:
# Keep the stack trace around so we can re-raise with it.
f._ex, f._v, f._tb = sys.exc_info()
return f
def shutdown(self, wait):
pass