import logging as _logging import os as _os import sys as _sys import _thread import time as _time import traceback as _traceback import warnings from logging import DEBUG from logging import ERROR from logging import CRITICAL from logging import INFO from logging import WARN import threading # Don't use this directly. Use get_logger() instead. _logger = None _logger_lock = threading.Lock() _file_handler = None def set_enabled(value: bool): get_logger().disabled = not value def is_enabled(): return not get_logger().disabled def error_log(error_msg, level=ERROR): """Empty helper method.""" del error_msg, level def _get_formatter(): return _logging.Formatter( fmt='%(asctime)s.%(msecs)03d %(name)-12s %(levelname)-8s %(thread)-6d %(message)s', datefmt='%H:%M:%S') def _get_caller(offset=3): """Returns a code and frame object for the lowest non-logging stack frame.""" # Use sys._getframe(). This avoids creating a traceback object. f = _sys._getframe(offset) our_file = f.f_code.co_filename f = f.f_back while f: code = f.f_code if code.co_filename != our_file: return code, f f = f.f_back return None, None def _logger_find_caller(stack_info=False, stacklevel=1): code, frame = _get_caller(4) sinfo = None if stack_info: sinfo = '\n'.join(_traceback.format_stack()) if code: return (code.co_filename, frame.f_lineno, code.co_name, sinfo) else: return '(unknown file)', 0, '(unknown function)', sinfo def get_logger(): global _logger # Use double-checked locking to avoid taking lock unnecessarily. if _logger: return _logger _logger_lock.acquire() try: if _logger: return _logger logger = _logging.getLogger('unitap') # Override findCaller on the logger to skip internal helper functions logger.findCaller = _logger_find_caller if not _logging.getLogger().handlers: # Determine whether we are in an interactive environment _interactive = False try: # This is only defined in interactive shells. if _sys.ps1: _interactive = True except AttributeError: # Even now, we may be in an interactive shell with `python -i`. _interactive = _sys.flags.interactive # If we are in an interactive environment (like Jupyter), set loglevel # to INFO and pipe the output to stdout. if _interactive: logger.setLevel(INFO) _logging_target = _sys.stdout else: _logging_target = _sys.stderr # Add the output handler. _handler = _logging.StreamHandler(_logging_target) _handler.setFormatter(_get_formatter()) logger.addHandler(_handler) logger.propagate = False _logger = logger return _logger finally: _logger_lock.release() def log(level, msg, *args, **kwargs): get_logger().log(level, msg, *args, **kwargs) def debug(msg, *args, **kwargs): get_logger().debug(msg, *args, **kwargs) def error(msg, *args, **kwargs): get_logger().error(msg, *args, **kwargs) def fatal(msg, *args, **kwargs): get_logger().fatal(msg, *args, **kwargs) def info(msg, *args, **kwargs): get_logger().info(msg, *args, **kwargs) def warn(msg, *args, **kwargs): get_logger().warning(msg, *args, **kwargs) def warning(msg, *args, **kwargs): get_logger().warning(msg, *args, **kwargs) _level_names = { CRITICAL: 'CRITICAL', ERROR: 'ERROR', WARN: 'WARN', INFO: 'INFO', DEBUG: 'DEBUG', } # Mask to convert integer thread ids to unsigned quantities for logging # purposes _THREAD_ID_MASK = 2 * _sys.maxsize + 1 _log_prefix = None # later set to google2_log_prefix # Counter to keep track of number of log entries per token. _log_counter_per_token = {} def TaskLevelStatusMessage(msg): error(msg) def flush(): raise NotImplementedError() # Code below is taken from pyglib/logging def vlog(level, msg, *args, **kwargs): get_logger().log(level, msg, *args, **kwargs) def _GetNextLogCountPerToken(token): global _log_counter_per_token _log_counter_per_token[token] = 1 + _log_counter_per_token.get(token, -1) return _log_counter_per_token[token] def log_every_n(level, msg, n, *args): """Log 'msg % args' at level 'level' once per 'n' times. Logs the 1st call, (N+1)st call, (2N+1)st call, etc. Not threadsafe. Args: level: The level at which to log. msg: The message to be logged. n: The number of times this should be called before it is logged. *args: The args to be substituted into the msg. """ count = _GetNextLogCountPerToken(_GetFileAndLine()) log_if(level, msg, not (count % n), *args) def log_first_n(level, msg, n, *args): """Log 'msg % args' at level 'level' only first 'n' times. Not threadsafe. Args: level: The level at which to log. msg: The message to be logged. n: The number of times this should be called before it is logged. *args: The args to be substituted into the msg. """ count = _GetNextLogCountPerToken(_GetFileAndLine()) log_if(level, msg, count < n, *args) def log_if(level, msg, condition, *args): """Log 'msg % args' at level 'level' only if condition is fulfilled.""" if condition: vlog(level, msg, *args) def _GetFileAndLine(): """Returns (filename, linenumber) for the stack frame.""" code, f = _get_caller() if not code: return '', 0 return code.co_filename, f.f_lineno def google2_log_prefix(level, timestamp=None, file_and_line=None): """Assemble a logline prefix using the google2 format.""" global _level_names # Record current time now = timestamp or _time.time() now_tuple = _time.localtime(now) now_microsecond = int(1e6 * (now % 1.0)) (filename, line) = file_and_line or _GetFileAndLine() basename = _os.path.basename(filename) # Severity string severity = 'I' if level in _level_names: severity = _level_names[level][0] s = '%c%02d%02d %02d:%02d:%02d.%06d %5d %s:%d] ' % ( severity, now_tuple[1], # month now_tuple[2], # day now_tuple[3], # hour now_tuple[4], # min now_tuple[5], # sec now_microsecond, _get_thread_id(), basename, line) return s def get_verbosity(): """Return how much logging output will be produced.""" return get_logger().getEffectiveLevel() def set_verbosity(v): """Sets the threshold for what messages will be logged.""" get_logger().setLevel(v) def set_file_output(filename, file_mode="w"): """Set file handler""" warnings.warn("This function is deprecated. Use `enable_file_logging` and `disable_file_logging` instead.", DeprecationWarning) enable_file_logging(filename, file_mode) def enable_file_logging(filename, file_mode="w"): global _file_handler if _file_handler is None: _file_handler = _logging.FileHandler(filename, mode=file_mode) _file_handler.setFormatter(_get_formatter()) get_logger().addHandler(_file_handler) else: _file_handler.stream.close() filename = _os.fspath(filename) _file_handler.baseFilename = _os.path.abspath(filename) _file_handler.stream = open(_file_handler.baseFilename, file_mode) def disable_file_logging(): global _file_handler if _file_handler is not None: _file_handler.stream.close() get_logger().removeHandler(_file_handler) _file_handler = None else: warnings.warn("Cannot disable file logging because file logger is not defined") def _get_thread_id(): """Get id of current thread, suitable for logging as an unsigned quantity.""" thread_id = _thread.get_ident() return thread_id & _THREAD_ID_MASK _log_prefix = google2_log_prefix