Source code for seq.lib.log

"""
Logging config
"""
# pylint: disable= invalid-name

import logging
import logging.config
import socket
import threading
import datetime
import time
import json

from pathlib import Path
from seq.lib import keywords as KW

from ModSeqif import Seqif
import elt.pymal as mal

logger = logging.getLogger(__name__)
_old_log_factory = logging.getLogRecordFactory()  # Old factory


def _log_record_factory(*args, **kwargs):
    from seq.lib.nodes import (Action, Sequence)

    record = _old_log_factory(*args, **kwargs)
    current_node = Action.current_node.get()
    current_seq = Sequence.current_seq.get()
    sn = -1                      # safe default
    
    if current_node:
       sn = current_node.serial_number
    elif current_seq:
        sn = current_seq.serial_number
    record._seq_node_sn = sn
    return record


def _merge_log_config(d1, d2):
    def keyval(k, a, b):
        # print("KVAL", k, a.get(k,None) or b.get(k,None))
        return b.get(k, None) or a.get(k, None)

    mykeys = {*d1, *d2}
    return {
        k: _merge_log_config(d1.get(k, {}), d2.get(k, {}))
        if isinstance(d1.get(k, None), dict)
        else keyval(k, d1, d2)
        for k in mykeys
    }


[docs]def logConfig(level=logging.INFO, filename=None, remote=False, pub_id=None): """ Configs logging for seq module Args: level(int): logging level. filename(str): logging file name Returns: None """ d = LOGGING if remote: d = _merge_log_config(LOGGING, _REMOTE_CFG) if pub_id: d["handlers"]["publisher"]["pub_id"] = pub_id d["loggers"]["seq"]["level"] = level if filename is None: filename = "seq.log" p = Path(filename) p = p.with_name("seq_user.log") if "filename" not in d["handlers"]["logfile"]: d["handlers"]["logfile"]["filename"] = filename if "filename" not in d["handlers"]["logfileUser"]: d["handlers"]["logfileUser"]["filename"] = p.as_posix() logging.config.dictConfig(d) logging.setLogRecordFactory(_log_record_factory) seq_logger = logging.getLogger("seq") seq_logger.info("seq logger initialized") seq_lib_logger = logging.getLogger("seq.lib") seq_lib_logger.info("seq.lib logger initialized") user_logger = logging.getLogger("seq.user") logger.info("log filename: %s", d["handlers"]["logfile"]["filename"]) user_logger.info("User's Logging initialized")
# if remote: # seq_logger.info("REMOTE LOG CONFIG: %s", json.dumps(d, indent=4))
[docs]def getUserLogger(): """Gets the user's Logger.""" return logging.getLogger("seq.user")
LOGGING = { "version": 1, "disable_existing_loggers": True, "formatters": { "simple": { "format": "%(levelname)s:(%(module)s.%(funcName)s): %(message)s" }, "verbose": { "format": "%(asctime)s [%(_seq_node_sn)s] <%(name)s> %(levelname)s:(%(module)s.%(funcName)s):%(lineno)-d: %(message)s" }, "timed": { "format": "%(asctime)s %(levelname)s: %(message)s", }, }, "handlers": { "console": { "class": "logging.StreamHandler", "formatter": "simple", }, "logfile": { "class": "logging.FileHandler", "formatter": "verbose", # "filename": "dummy.log", # "filename": "%(filename)s", "mode": "w", }, "logfileUser": { "class": "logging.FileHandler", "formatter": "verbose", # "filename": "dummy.log", # "filename": "%(filename)s", "mode": "w", }, }, "loggers": { "seq": { "level": logging.INFO, "handlers": ["console", "logfile"], "propagate": False, "formatter": "verbose", }, "seq.lib": { "level": logging.INFO, "handlers": ["logfile"], "propagate": False, "formatter": "verbose", }, "seq.user": { "level": logging.INFO, "handlers": ["logfileUser","console"], "propagate": False, "formatter": "verbose", }, }, } _REMOTE_CFG = { "handlers": { "publisher": { "class": "seq.lib.log.FastDDSPublisherHandler", "formatter": "verbose", "pub_id": "UNDEFINED", }, }, "loggers": { # "": { # Allows to use any logger name in sequencer scripts # "level": logging.INFO, # "handlers": ["console", "publisher"], # "propagate": False, # "formatter": "verbose", # }, "seq": { "level": logging.INFO, "handlers": ["console", "logfile", "publisher"], }, "seq.lib": { "level": logging.INFO, "handlers": ["logfile", "publisher"], }, "seq.user": { "level": logging.INFO, "handlers": ["logfileUser", "publisher"], }, }, }
[docs]class FastDDSPublisherHandler(logging.Handler): """ FastDDSPublisherHandler ====================== A python logging handler. It relies on fastdds publisher that will publish every new record to subscribers. """ def __init__(self, pub_id=""): super(FastDDSPublisherHandler, self).__init__() uri = "dds.ps:///{}{}".format(KW.SEQ_LOG_TOPIC, pub_id) self._ddsMal = mal.loadMal("dds", {}) factory = mal.CiiFactory.getInstance() factory.registerMal("dds", self._ddsMal) self._publisher = factory.getPublisher( uri, Seqif.LogRecord, qos=mal.ps.qos.DEFAULT )
[docs] def emit(self, record: logging.LogRecord): try: timestamp = "%s.%03d" % ( record.__getattribute__("asctime"), record.__getattribute__("msecs"), ) except AttributeError: timestamp = "" pub_log_record = self._publisher.createDataEntity() pub_log_record.setSwmodule(record.module) pub_log_record.setFuncname(record.funcName) pub_log_record.setLineno(record.lineno) pub_log_record.setTimestamp(timestamp) pub_log_record.setLevelname(record.levelname) pub_log_record.setName(record.name) pub_log_record.setSerialNumber(record._seq_node_sn) # we store the log message in a blob to overcome length limit binary_log = bytearray() binary_log.extend(map(ord, record.getMessage())) vec_log = mal.SharedVectorUint8(binary_log) vec_log = vec_log.freeze() # to set the blob message, we have to read the blob message of the published # log record and to assign the binary data to it blob_log = pub_log_record.getMessage() blob_log.setBlob(vec_log) timestamp = time.time() self._publisher.publishWithTimestamp( timestamp, pub_log_record, datetime.timedelta(seconds=1) )