# ******************************************************
#
# Copyright (c) Extreme Networks Inc. 2014, 2021
# All rights reserved
#
# ******************************************************
import logging
from exos.api import util, dispatch
import _exos_ext_ems as ext
# Bring the exception types into this module
from _exos_ext_ems import ( # noqa: F401
UnknownTraceBufferError,
TraceBufferAlreadyExistsError,
LogComponentRegisteredError,
LogComponentNotRegisteredError,
)
# EXOS severity levels. These will be mapped to Python logging severity
# levels as appropriate.
Severity = util.Enum(
["CRITICAL", "ERROR", "WARNING", "NOTICE", "INFO", "DEBUG", "DEBUG_SUMMARY", "DEBUG_VERBOSE", "DEBUG_DATA"]
)
[docs]class TraceBuffer(ext.TraceBuffer):
"""A circular in-memory log buffer."""
def __init__(self, buffer_name, buffer_size):
super(TraceBuffer, self).__init__(buffer_name, buffer_size)
[docs] def dump(self, file_name=None):
"""
dump(file_name)
Dump this trace buffer to *file_name*. When *file_name* is ``None``,
the filename is generated. The file is always written under
``/usr/local/tmp``.
"""
super(TraceBuffer, self).dump(file_name)
[docs] def clear(self):
"""
clear()
Clear this trace buffer. All existing log entries will be deleted.
"""
super(TraceBuffer, self).clear()
[docs] def log(self, msg):
"""
log(msg)
Log *msg* to the trace buffer
"""
# Chop log entries so they are no more than 1K in length.
# - When retrieving entries, EMS balks at entries longer than
# emsProcessTrace.lineBuffer (1024). The end result is it quits
# grabbing logs and bails out of loop early. So, chopping it up here
# is a bit of a workaround.
# - Leave some extra space for "timestamp <process:buffer>".
entries = [msg[i : 900 + i] for i in range(0, len(msg), 900)]
first = True
for entry in entries:
if first:
super(TraceBuffer, self).log(entry)
else:
super(TraceBuffer, self).log(">>> " + entry)
first = False
[docs]class TraceBufferHandler(logging.Handler):
"""A :py:class:`logging.Handler` that's backed by a :class:`TraceBuffer`.
Allows the standard Python logging mechanism to be used with trace buffers.
*buffer_name* and *buffer_size* are used to create the
:class:`TraceBuffer`.
"""
[docs] def __init__(self, buffer_name, buffer_size):
"""Create a new handler, backed by a trace buffer named *buffer_name*.
:class:`TraceBufferAlreadyExistsError` is thrown if the trace buffer
has already been created.
"""
super(TraceBufferHandler, self).__init__()
#: The underlying trace buffer
self.trace_buffer = TraceBuffer(buffer_name, buffer_size)
[docs] def emit(self, record):
"""Write *record* to the trace buffer."""
# Don't let the formatter format an exception. It'll use \n and EMS
# hates that.
exc = record.exc_info
record.exc_info = None
# Format the message and log it
msg = self.format(record)
self.trace_buffer.log(msg)
# Replace the exc so other handlers have it.
record.exc_info = exc
# Now, deal with the exception by adding it one line at a time to the buffer.
if exc:
import traceback
for exc_line in traceback.format_exception(exc[0], exc[1], exc[2]):
self.trace_buffer.log(exc_line)
[docs]def dump_trace_buffer(buffer_name=None, file_name=None):
"""
dump_trace_buffer(buffer_name, file_name)
Dump a trace buffer *buffer_name* to a file *file_name*. If *buffer_name*
is ``None``, then all buffers is dumped. If *file_name* is ``None``,
the file name will be generated. The trace buffer is written to
``/usr/local/tmp/<file_name>``. If *buffer_name* is given, but does not
exist, :class:`UnknownTraceBufferError` is raised.
"""
ext.dump_trace_buffer(buffer_name, file_name)
[docs]def clear_trace_buffer(buffer_name=None):
"""
clear_trace_buffer(buffer_name)
Clear a trace buffer ``buffer_name``. If ``buffer_name`` is ``None``, then
all buffers are cleared. If *buffer_name* is given, but does not exist,
:class:`UnknownTraceBufferError` is raised.
"""
ext.clear_trace_buffer(buffer_name)
_FilteredFiles = []
_FilteredCallers = {}
def add_callstack_filter(filter_func, last_func=None, is_file=False):
"""
add_callstack_filter(filter_func, last_func, is_file)
Add to the files or functions that will be skipped over by ``findCaller()``.
``findCaller()`` is used by the ``logging`` module and :class:`ExosLogger`
to determine the file name, line number and function to be included in a
log message. For this ``findCaller()`` traverses the function call stack
and skips any functions that are:
- defined in a file in *_FilteredFiles*
- in *_FilteredCallers*
This method provides a mechanism to add files to *_FilteredFiles* and
functions to *_FilteredCallers*.
If *filter_func* is not ``callable()``, :class:`TypeError` is raised.
If *is_file* is ``True`` then the file in which the *filter_func* is defined
is added to *_FilteredFiles*; *last_func* is ignored in this case. This
results in all functions defined in the same file as *filter_func* being
skipped by ``findCaller()``.
Otherwise, *filter_func* is added to *_FilteredCallers*. In this case if
*last_func* is ``None`` then only the function identified by *filter_func*
is skipped. However, if *last_func* is also ``callable()`` then any function
in the call stack between *filter_func* (most recent call) and *last_func*
(earliest call) is skipped.
"""
global _FilteredFiles
global _FilteredCallers
if not callable(filter_func):
raise TypeError("Function is not callable")
if is_file:
import inspect
file = inspect.getfile(filter_func.__code__)
if file not in _FilteredFiles:
_FilteredFiles.append(file)
return
if not callable(last_func):
last_func = None
_FilteredCallers[filter_func.__code__] = last_func
def _findCaller(self):
"""
_findCaller(self)
Follow the call stack and find the function that is generating the log.
"""
file = "(unknown file)"
line = 0
func = "(unknown function)"
# Nothing to do if the logging module doesn't support this functionality.
if not logging._srcfile:
return (file, line, func)
import inspect
frame = inspect.currentframe().f_back
try:
while frame:
if frame.f_code not in _FilteredCallers:
if frame.f_code.co_filename not in _FilteredFiles:
file = frame.f_code.co_filename
line = frame.f_lineno
func = frame.f_code.co_name
break
# Function for this frame is defined in a filtered-file; so
# continue to rewind the stack
frame = frame.f_back
continue
# Function for this frame is filtered-caller; so rewind the
# stack ...
stop_func = _FilteredCallers[frame.f_code]
if not stop_func:
# ... only 1 frame (no stop_func)
frame = frame.f_back
continue
# ... till stop_func
while frame:
if frame.f_code == stop_func.__code__:
break
frame = frame.f_back
except Exception:
pass
finally:
del frame
return (file, line, func)
# Seed the filter with this file and the logging module's file.
add_callstack_filter(add_callstack_filter, is_file=True)
# Patch the logging.Logger class so that the _findCaller() defined above
# is used by all loggers.
logging.Logger.findCaller = _findCaller
add_callstack_filter(logging.Logger.__init__, is_file=True)
if logging.getLoggerClass() is not logging.Logger:
logging.getLoggerClass().findCaller = _findCaller
add_callstack_filter(logging.getLoggerClass().__init__, is_file=True)
[docs]class ExosLogger(logging.LoggerAdapter):
"""An ExosLogger defines system log entries, such as those found in "show
log". As conditions are added to the ExosLogger, methods are added to log
those conditions.
"""
[docs] def __init__(
self,
name,
title,
format_version,
content_version=1,
component_id=None,
factory_threshold=Severity.INFO,
_internal_id=None,
):
"""
__init__(name, title, format_version, content_version, component_id, factory_threshold)
Create an ExosLogger and a Python Logger to back it. The Python Logger
will use the name *name*.
*title* is an informational string, describing the logger.
*format_version* should be increased any time formatting of conditions in
the ExosLogger is changed.
*content_version* should be increased any time conditions are added to
the ExosLogger and formatting is not changed. Defaults to 1.
*component_id* is a unique number greater than 511 and less than 1024.
If it is not provided, the name will be hashed to generate it. This is
sufficient for most applications.
*factory_threshold* is the Severity level at which we'll filter by default.
Log messages below this level will be dropped, unless the threshold
is overridden by configuration. By default, Severity.INFO.
"""
# Init the LoggerAdapter, our super class, by getting our logger.
super(ExosLogger, self).__init__(logging.getLogger(name), None)
if component_id is None:
# If this is an internal application, take the ID without question.
# This will come from get_logger() when generated from an ems xml
# file.
if _internal_id is not None:
component_id = _internal_id
else:
# Generate the component_id.
component_id = hash(name) % 512 + 512
else:
# If the component_id was provided, it must not conflict with EXOS
# processes.
if component_id < 512 or component_id > 1023:
raise ValueError("component_id must be between 512 and 1023, inclusive")
# Create the LogComponent that will back this ExosLogger.
self.comp = ext.LogComponent(
name, title, self._map_sev_ems(factory_threshold), component_id, format_version, content_version
)
_sevmap_ems = {
Severity.CRITICAL: ext._EMS_CRITICAL_SEVERITY,
Severity.ERROR: ext._EMS_ERROR_SEVERITY,
Severity.WARNING: ext._EMS_WARNING_SEVERITY,
Severity.NOTICE: ext._EMS_NOTICE_SEVERITY,
Severity.INFO: ext._EMS_INFO_SEVERITY,
Severity.DEBUG: ext._EMS_DEBUG_SUMMARY_SEVERITY,
Severity.DEBUG_SUMMARY: ext._EMS_DEBUG_SUMMARY_SEVERITY,
Severity.DEBUG_VERBOSE: ext._EMS_DEBUG_VERBOSE_SEVERITY,
Severity.DEBUG_DATA: ext._EMS_DEBUG_DATA_SEVERITY,
}
def _map_sev_ems(self, sev):
if sev not in Severity:
raise ValueError("Unknown severity")
return ExosLogger._sevmap_ems[sev]
_sevmap_logging = {
Severity.CRITICAL: logging.CRITICAL,
Severity.ERROR: logging.ERROR,
Severity.WARNING: logging.WARNING,
Severity.NOTICE: logging.INFO,
Severity.INFO: logging.INFO,
Severity.DEBUG: logging.DEBUG,
Severity.DEBUG_SUMMARY: logging.DEBUG,
Severity.DEBUG_VERBOSE: logging.DEBUG,
Severity.DEBUG_DATA: logging.DEBUG,
}
def _map_sev_logging(self, sev):
if sev not in Severity:
raise ValueError("Unknown severity")
return ExosLogger._sevmap_logging[sev]
def _process_msg(self, msg, params):
"""Tame the given message into something EMS can handle. """
for i, param in enumerate(params):
in_token = "%" + param + "%"
out_token = "%" + str(i) + "%"
msg = msg.replace(in_token, out_token)
return msg
[docs] def register(self):
"""Register this ExosLogger with EXOS. After registering, the log conditions
can be used and entries made in show log. However, conditions cannot be
added after registering.
"""
# Start the registration on the dispatch thread. The registration is done
# lazily using an FSM. Most of the FSM will run on the dispatcher thread, so
# avoid problems by starting it there, too.
dispatch.DispatchTimer(0, self.comp.register).start()
[docs] def ready(self):
"""Return ``True`` if this ExosLogger has registered and is ready to log
conditions.
"""
return True if self.comp.ready() else False
[docs] def add_condition(self, name, msg, params=None, sev=Severity.WARNING):
"""
add_condition(name, msg, params, sev)
Add a log condition to this ExosLogger. A :class:`LogComponentRegisteredError`
is raised if the ExosLogger has been registered. A method *name* is added to
this logger for the new condition.
*msg* is the log message. It may contain parameters as "%foo%".
*params* is an optional list of parameters to the condition. These
must match names used in the *msg*.
*sev* is the severity of the condition. The default is :data:`Severity.WARNING`.
"""
if params is None:
params = []
# Map the severity.
logsev = self._map_sev_logging(sev)
emssev = self._map_sev_ems(sev)
# Massage the msg into an EPM acceptable format
epm_msg = self._process_msg(msg, params)
# Create the condition. This may raise an exception.
code = self.comp.add_condition(name, epm_msg, emssev, len(params))
# Create a closure to wrap notify().
def _notify(*args, **kwds):
# Get the arguments into a mutable list
arglist = list(args)
# Build the argument list by matching our arguments with the
# expected set of params.
notify_args = []
for param in params:
# Start with the positional arguments. These have to go first.
if len(arglist) > 0:
notify_args.append(str(arglist.pop(0)))
# If we're out of positional arguments, look for keyword matches.
else:
notify_args.append(str(kwds.pop(param)))
# We should have consumed all of our arguments. If not, then they
# sent us something we didn't expect.
if len(arglist) or len(kwds):
raise TypeError("Unexpected arguments")
# Get the filename/linenum/funcname by inspecting our caller's frame.
filename, linenum, funcname = self.logger.findCaller()
# Pass it to EMS. This is where the log actually happens.
try:
self.comp.notify(code, filename, linenum, funcname, *notify_args)
except LogComponentNotRegisteredError:
raise RuntimeError("ExosLogger is not registered")
# Log into our logger. We can't format it, but do the best we can.
self.logger.log(logsev, " ".join(notify_args))
# Bind the closure to ourselves. The condition will be logged by calling
# this closure.
self.__dict__[name] = _notify