Source code for exos.api.ems

# ******************************************************
#
#  Copyright (c) Extreme Networks Inc. 2014, 2021
#  All rights reserved
#
# ******************************************************

import logging
from exos.api import util, dispatch
import _exos_ext_ems as ext

# Bring the exception types into this module
from _exos_ext_ems import (  # noqa: F401
    UnknownTraceBufferError,
    TraceBufferAlreadyExistsError,
    LogComponentRegisteredError,
    LogComponentNotRegisteredError,
)

# EXOS severity levels.  These will be mapped to Python logging severity
# levels as appropriate.
Severity = util.Enum(
    ["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG", "DEBUG_SUMMARY", "DEBUG_VERBOSE", "DEBUG_DATA"]
)


[docs]class TraceBuffer(ext.TraceBuffer): """A circular in-memory log buffer.""" def __init__(self, buffer_name, buffer_size): super(TraceBuffer, self).__init__(buffer_name, buffer_size)
[docs] def dump(self, file_name=None): """ dump(file_name) Dump this trace buffer to *file_name*. When *file_name* is ``None``, the filename is generated. The file is always written under ``/usr/local/tmp``. """ super(TraceBuffer, self).dump(file_name)
[docs] def clear(self): """ clear() Clear this trace buffer. All existing log entries will be deleted. """ super(TraceBuffer, self).clear()
[docs] def log(self, msg): """ log(msg) Log *msg* to the trace buffer """ # Chop log entries so they are no more than 1K in length. # - When retrieving entries, EMS balks at entries longer than # emsProcessTrace.lineBuffer (1024). The end result is it quits # grabbing logs and bails out of loop early. So, chopping it up here # is a bit of a workaround. # - Leave some extra space for "timestamp <process:buffer>". entries = [msg[i : 900 + i] for i in range(0, len(msg), 900)] first = True for entry in entries: if first: super(TraceBuffer, self).log(entry) else: super(TraceBuffer, self).log(">>> " + entry) first = False
[docs]class TraceBufferHandler(logging.Handler): """A :py:class:`logging.Handler` that's backed by a :class:`TraceBuffer`. Allows the standard Python logging mechanism to be used with trace buffers. *buffer_name* and *buffer_size* are used to create the :class:`TraceBuffer`. """
[docs] def __init__(self, buffer_name, buffer_size): """Create a new handler, backed by a trace buffer named *buffer_name*. :class:`TraceBufferAlreadyExistsError` is thrown if the trace buffer has already been created. """ super(TraceBufferHandler, self).__init__() #: The underlying trace buffer self.trace_buffer = TraceBuffer(buffer_name, buffer_size)
[docs] def emit(self, record): """Write *record* to the trace buffer.""" # Don't let the formatter format an exception. It'll use \n and EMS # hates that. exc = record.exc_info record.exc_info = None # Format the message and log it msg = self.format(record) self.trace_buffer.log(msg) # Replace the exc so other handlers have it. record.exc_info = exc # Now, deal with the exception by adding it one line at a time to the buffer. if exc: import traceback for exc_line in traceback.format_exception(exc[0], exc[1], exc[2]): self.trace_buffer.log(exc_line)
[docs]def dump_trace_buffer(buffer_name=None, file_name=None): """ dump_trace_buffer(buffer_name, file_name) Dump a trace buffer *buffer_name* to a file *file_name*. If *buffer_name* is ``None``, then all buffers is dumped. If *file_name* is ``None``, the file name will be generated. The trace buffer is written to ``/usr/local/tmp/<file_name>``. If *buffer_name* is given, but does not exist, :class:`UnknownTraceBufferError` is raised. """ ext.dump_trace_buffer(buffer_name, file_name)
[docs]def clear_trace_buffer(buffer_name=None): """ clear_trace_buffer(buffer_name) Clear a trace buffer ``buffer_name``. If ``buffer_name`` is ``None``, then all buffers are cleared. If *buffer_name* is given, but does not exist, :class:`UnknownTraceBufferError` is raised. """ ext.clear_trace_buffer(buffer_name)
_FilteredFiles = [] _FilteredCallers = {} def add_callstack_filter(filter_func, last_func=None, is_file=False): """ add_callstack_filter(filter_func, last_func, is_file) Add to the files or functions that will be skipped over by ``findCaller()``. ``findCaller()`` is used by the ``logging`` module and :class:`ExosLogger` to determine the file name, line number and function to be included in a log message. For this ``findCaller()`` traverses the function call stack and skips any functions that are: - defined in a file in *_FilteredFiles* - in *_FilteredCallers* This method provides a mechanism to add files to *_FilteredFiles* and functions to *_FilteredCallers*. If *filter_func* is not ``callable()``, :class:`TypeError` is raised. If *is_file* is ``True`` then the file in which the *filter_func* is defined is added to *_FilteredFiles*; *last_func* is ignored in this case. This results in all functions defined in the same file as *filter_func* being skipped by ``findCaller()``. Otherwise, *filter_func* is added to *_FilteredCallers*. In this case if *last_func* is ``None`` then only the function identified by *filter_func* is skipped. However, if *last_func* is also ``callable()`` then any function in the call stack between *filter_func* (most recent call) and *last_func* (earliest call) is skipped. """ global _FilteredFiles global _FilteredCallers if not callable(filter_func): raise TypeError("Function is not callable") if is_file: import inspect file = inspect.getfile(filter_func.__code__) if file not in _FilteredFiles: _FilteredFiles.append(file) return if not callable(last_func): last_func = None _FilteredCallers[filter_func.__code__] = last_func def _findCaller(self): """ _findCaller(self) Follow the call stack and find the function that is generating the log. """ file = "(unknown file)" line = 0 func = "(unknown function)" # Nothing to do if the logging module doesn't support this functionality. if not logging._srcfile: return (file, line, func) import inspect frame = inspect.currentframe().f_back try: while frame: if frame.f_code not in _FilteredCallers: if frame.f_code.co_filename not in _FilteredFiles: file = frame.f_code.co_filename line = frame.f_lineno func = frame.f_code.co_name break # Function for this frame is defined in a filtered-file; so # continue to rewind the stack frame = frame.f_back continue # Function for this frame is filtered-caller; so rewind the # stack ... stop_func = _FilteredCallers[frame.f_code] if not stop_func: # ... only 1 frame (no stop_func) frame = frame.f_back continue # ... till stop_func while frame: if frame.f_code == stop_func.__code__: break frame = frame.f_back except Exception: pass finally: del frame return (file, line, func) # Seed the filter with this file and the logging module's file. add_callstack_filter(add_callstack_filter, is_file=True) # Patch the logging.Logger class so that the _findCaller() defined above # is used by all loggers. logging.Logger.findCaller = _findCaller add_callstack_filter(logging.Logger.__init__, is_file=True) if logging.getLoggerClass() is not logging.Logger: logging.getLoggerClass().findCaller = _findCaller add_callstack_filter(logging.getLoggerClass().__init__, is_file=True)
[docs]class ExosLogger(logging.LoggerAdapter): """An ExosLogger defines system log entries, such as those found in "show log". As conditions are added to the ExosLogger, methods are added to log those conditions. """
[docs] def __init__( self, name, title, format_version, content_version=1, component_id=None, factory_threshold=Severity.INFO, _internal_id=None, ): """ __init__(name, title, format_version, content_version, component_id, factory_threshold) Create an ExosLogger and a Python Logger to back it. The Python Logger will use the name *name*. *title* is an informational string, describing the logger. *format_version* should be increased any time formatting of conditions in the ExosLogger is changed. *content_version* should be increased any time conditions are added to the ExosLogger and formatting is not changed. Defaults to 1. *component_id* is a unique number greater than 511 and less than 1024. If it is not provided, the name will be hashed to generate it. This is sufficient for most applications. *factory_threshold* is the Severity level at which we'll filter by default. Log messages below this level will be dropped, unless the threshold is overridden by configuration. By default, Severity.INFO. """ # Init the LoggerAdapter, our super class, by getting our logger. super(ExosLogger, self).__init__(logging.getLogger(name), None) if component_id is None: # If this is an internal application, take the ID without question. # This will come from get_logger() when generated from an ems xml # file. if _internal_id is not None: component_id = _internal_id else: # Generate the component_id. component_id = hash(name) % 512 + 512 else: # If the component_id was provided, it must not conflict with EXOS # processes. if component_id < 512 or component_id > 1023: raise ValueError("component_id must be between 512 and 1023, inclusive") # Create the LogComponent that will back this ExosLogger. self.comp = ext.LogComponent( name, title, self._map_sev_ems(factory_threshold), component_id, format_version, content_version )
_sevmap_ems = { Severity.CRITICAL: ext._EMS_CRITICAL_SEVERITY, Severity.ERROR: ext._EMS_ERROR_SEVERITY, Severity.WARNING: ext._EMS_WARNING_SEVERITY, Severity.NOTICE: ext._EMS_NOTICE_SEVERITY, Severity.INFO: ext._EMS_INFO_SEVERITY, Severity.DEBUG: ext._EMS_DEBUG_SUMMARY_SEVERITY, Severity.DEBUG_SUMMARY: ext._EMS_DEBUG_SUMMARY_SEVERITY, Severity.DEBUG_VERBOSE: ext._EMS_DEBUG_VERBOSE_SEVERITY, Severity.DEBUG_DATA: ext._EMS_DEBUG_DATA_SEVERITY, } def _map_sev_ems(self, sev): if sev not in Severity: raise ValueError("Unknown severity") return ExosLogger._sevmap_ems[sev] _sevmap_logging = { Severity.CRITICAL: logging.CRITICAL, Severity.ERROR: logging.ERROR, Severity.WARNING: logging.WARNING, Severity.NOTICE: logging.INFO, Severity.INFO: logging.INFO, Severity.DEBUG: logging.DEBUG, Severity.DEBUG_SUMMARY: logging.DEBUG, Severity.DEBUG_VERBOSE: logging.DEBUG, Severity.DEBUG_DATA: logging.DEBUG, } def _map_sev_logging(self, sev): if sev not in Severity: raise ValueError("Unknown severity") return ExosLogger._sevmap_logging[sev] def _process_msg(self, msg, params): """Tame the given message into something EMS can handle. """ for i, param in enumerate(params): in_token = "%" + param + "%" out_token = "%" + str(i) + "%" msg = msg.replace(in_token, out_token) return msg
[docs] def register(self): """Register this ExosLogger with EXOS. After registering, the log conditions can be used and entries made in show log. However, conditions cannot be added after registering. """ # Start the registration on the dispatch thread. The registration is done # lazily using an FSM. Most of the FSM will run on the dispatcher thread, so # avoid problems by starting it there, too. dispatch.DispatchTimer(0, self.comp.register).start()
[docs] def ready(self): """Return ``True`` if this ExosLogger has registered and is ready to log conditions. """ return True if self.comp.ready() else False
[docs] def add_condition(self, name, msg, params=None, sev=Severity.WARNING): """ add_condition(name, msg, params, sev) Add a log condition to this ExosLogger. A :class:`LogComponentRegisteredError` is raised if the ExosLogger has been registered. A method *name* is added to this logger for the new condition. *msg* is the log message. It may contain parameters as "%foo%". *params* is an optional list of parameters to the condition. These must match names used in the *msg*. *sev* is the severity of the condition. The default is :data:`Severity.WARNING`. """ if params is None: params = [] # Map the severity. logsev = self._map_sev_logging(sev) emssev = self._map_sev_ems(sev) # Massage the msg into an EPM acceptable format epm_msg = self._process_msg(msg, params) # Create the condition. This may raise an exception. code = self.comp.add_condition(name, epm_msg, emssev, len(params)) # Create a closure to wrap notify(). def _notify(*args, **kwds): # Get the arguments into a mutable list arglist = list(args) # Build the argument list by matching our arguments with the # expected set of params. notify_args = [] for param in params: # Start with the positional arguments. These have to go first. if len(arglist) > 0: notify_args.append(str(arglist.pop(0))) # If we're out of positional arguments, look for keyword matches. else: notify_args.append(str(kwds.pop(param))) # We should have consumed all of our arguments. If not, then they # sent us something we didn't expect. if len(arglist) or len(kwds): raise TypeError("Unexpected arguments") # Get the filename/linenum/funcname by inspecting our caller's frame. filename, linenum, funcname = self.logger.findCaller() # Pass it to EMS. This is where the log actually happens. try: self.comp.notify(code, filename, linenum, funcname, *notify_args) except LogComponentNotRegisteredError: raise RuntimeError("ExosLogger is not registered") # Log into our logger. We can't format it, but do the best we can. self.logger.log(logsev, " ".join(notify_args)) # Bind the closure to ourselves. The condition will be logged by calling # this closure. self.__dict__[name] = _notify