"""
Logging config
"""
# pylint: disable= invalid-name
import logging
import logging.config
import socket
import threading
import datetime
import time
import json
from pathlib import Path
from seq.lib import keywords as KW
from ModSeqif import Seqif
import elt.pymal as mal
logger = logging.getLogger(__name__)
_old_log_factory = logging.getLogRecordFactory() # Old factory
def _log_record_factory(*args, **kwargs):
from seq.lib.nodes import (Action, Sequence)
record = _old_log_factory(*args, **kwargs)
current_node = Action.current_node.get()
current_seq = Sequence.current_seq.get()
sn = -1 # safe default
if current_node:
sn = current_node.serial_number
elif current_seq:
sn = current_seq.serial_number
record._seq_node_sn = sn
return record
def _merge_log_config(d1, d2):
def keyval(k, a, b):
# print("KVAL", k, a.get(k,None) or b.get(k,None))
return b.get(k, None) or a.get(k, None)
mykeys = {*d1, *d2}
return {
k: _merge_log_config(d1.get(k, {}), d2.get(k, {}))
if isinstance(d1.get(k, None), dict)
else keyval(k, d1, d2)
for k in mykeys
}
[docs]def logConfig(level=logging.INFO, filename=None, remote=False, pub_id=None):
"""
Configs logging for seq module
Args:
level(int): logging level.
filename(str): logging file name
Returns:
None
"""
d = LOGGING
if remote:
d = _merge_log_config(LOGGING, _REMOTE_CFG)
if pub_id:
d["handlers"]["publisher"]["pub_id"] = pub_id
d["loggers"]["seq"]["level"] = level
if filename is None:
filename = "seq.log"
p = Path(filename)
p = p.with_name("seq_user.log")
if "filename" not in d["handlers"]["logfile"]:
d["handlers"]["logfile"]["filename"] = filename
if "filename" not in d["handlers"]["logfileUser"]:
d["handlers"]["logfileUser"]["filename"] = p.as_posix()
logging.config.dictConfig(d)
logging.setLogRecordFactory(_log_record_factory)
seq_logger = logging.getLogger("seq")
seq_logger.info("seq logger initialized")
seq_lib_logger = logging.getLogger("seq.lib")
seq_lib_logger.info("seq.lib logger initialized")
user_logger = logging.getLogger("seq.user")
logger.info("log filename: %s", d["handlers"]["logfile"]["filename"])
user_logger.info("User's Logging initialized")
# if remote:
# seq_logger.info("REMOTE LOG CONFIG: %s", json.dumps(d, indent=4))
[docs]def getUserLogger():
"""Gets the user's Logger."""
return logging.getLogger("seq.user")
LOGGING = {
"version": 1,
"disable_existing_loggers": True,
"formatters": {
"simple": {
"format": "%(levelname)s:(%(module)s.%(funcName)s): %(message)s"
},
"verbose": {
"format": "%(asctime)s [%(_seq_node_sn)s] <%(name)s> %(levelname)s:(%(module)s.%(funcName)s):%(lineno)-d: %(message)s"
},
"timed": {
"format": "%(asctime)s %(levelname)s: %(message)s",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "simple",
},
"logfile": {
"class": "logging.FileHandler",
"formatter": "verbose",
# "filename": "dummy.log",
# "filename": "%(filename)s",
"mode": "w",
},
"logfileUser": {
"class": "logging.FileHandler",
"formatter": "verbose",
# "filename": "dummy.log",
# "filename": "%(filename)s",
"mode": "w",
},
},
"loggers": {
"seq": {
"level": logging.INFO,
"handlers": ["console", "logfile"],
"propagate": False,
"formatter": "verbose",
},
"seq.lib": {
"level": logging.INFO,
"handlers": ["logfile"],
"propagate": False,
"formatter": "verbose",
},
"seq.user": {
"level": logging.INFO,
"handlers": ["logfileUser","console"],
"propagate": False,
"formatter": "verbose",
},
},
}
_REMOTE_CFG = {
"handlers": {
"publisher": {
"class": "seq.lib.log.FastDDSPublisherHandler",
"formatter": "verbose",
"pub_id": "UNDEFINED",
},
},
"loggers": {
# "": { # Allows to use any logger name in sequencer scripts
# "level": logging.INFO,
# "handlers": ["console", "publisher"],
# "propagate": False,
# "formatter": "verbose",
# },
"seq": {
"level": logging.INFO,
"handlers": ["console", "logfile", "publisher"],
},
"seq.lib": {
"level": logging.INFO,
"handlers": ["logfile", "publisher"],
},
"seq.user": {
"level": logging.INFO,
"handlers": ["logfileUser", "publisher"],
},
},
}
[docs]class FastDDSPublisherHandler(logging.Handler):
"""
FastDDSPublisherHandler
======================
A python logging handler.
It relies on fastdds publisher that will publish every new record to subscribers.
"""
def __init__(self, pub_id=""):
super(FastDDSPublisherHandler, self).__init__()
uri = "dds.ps:///{}{}".format(KW.SEQ_LOG_TOPIC, pub_id)
self._ddsMal = mal.loadMal("dds", {})
factory = mal.CiiFactory.getInstance()
factory.registerMal("dds", self._ddsMal)
self._publisher = factory.getPublisher(
uri, Seqif.LogRecord, qos=mal.ps.qos.DEFAULT
)
[docs] def emit(self, record: logging.LogRecord):
try:
timestamp = "%s.%03d" % (
record.__getattribute__("asctime"),
record.__getattribute__("msecs"),
)
except AttributeError:
timestamp = ""
pub_log_record = self._publisher.createDataEntity()
pub_log_record.setSwmodule(record.module)
pub_log_record.setFuncname(record.funcName)
pub_log_record.setLineno(record.lineno)
pub_log_record.setTimestamp(timestamp)
pub_log_record.setLevelname(record.levelname)
pub_log_record.setName(record.name)
pub_log_record.setSerialNumber(record._seq_node_sn)
# we store the log message in a blob to overcome length limit
binary_log = bytearray()
binary_log.extend(map(ord, record.getMessage()))
vec_log = mal.SharedVectorUint8(binary_log)
vec_log = vec_log.freeze()
# to set the blob message, we have to read the blob message of the published
# log record and to assign the binary data to it
blob_log = pub_log_record.getMessage()
blob_log.setBlob(vec_log)
timestamp = time.time()
self._publisher.publishWithTimestamp(
timestamp, pub_log_record, datetime.timedelta(seconds=1)
)