import logging
import threading
import os
import pathlib
from collections import defaultdict
from logging.config import dictConfig
import requests
LOG_DIR = pathlib.Path("~/.plasmalab/logs").expanduser()
[docs]
class TimedSummaryFilter(logging.Filter):
"""This can deal with very frequent records from libraries like socketio.
1. Start timer, intercept everything with specified level.
- count identical records (identical: levelno & message & args)
- let first through, drop any subsequent (i.e. infrequent records are
not affected)
2. Timer finishes, make a summary and log it.
- log the counts (minus the first one) per each record entry
- restart timer, continue with step 1.
"""
def __init__(self, interval=10, levels=[logging.INFO]):
super().__init__()
self.interval = interval
self.levels = levels
self._counts = defaultdict(int)
self._timer = None
self.log_summary()
def __del__(self):
if self._timer is not None:
self._timer.cancel()
pass
[docs]
def filter(self, record):
if any(l in self.levels for l in (record.levelno, record.levelname)):
id_ = (record.levelno, record.msg, record.args)
self._counts[id_] += 1
return self._counts[id_] <= 1 # First goes through, others do not
else:
return True
[docs]
def log_summary(self):
for (level, msg, args), count in self._counts.items():
if count <= 1:
continue
logging.getLogger().log(
level, "(%d per %ds) " + msg, count - 1, self.interval, *args)
self._counts.clear()
self._timer = threading.Timer(self.interval, self.log_summary)
self._timer.daemon = True
self._timer.start()
[docs]
class SlackHandler(logging.Handler):
"""Send messages or files to a Slack channel. To send a file, specify the
path to the file as a file attribute:
slacklog.info("Sending file", extra={"file": "path/to/file.txt"})
"""
def __init__(self, token, channel):
"""
:token str: authorization token for Slack API
:channel str: channel name (e.g. #general) or channel id
"""
super().__init__()
self.slack_token = token
self.slack_channel = channel
[docs]
def emit(self, record):
emote = {
logging.ERROR: ":large_red_square:",
logging.WARNING: ":large_yellow_square:",
}.get(record.levelno, "")
text = f"{emote} _{record.levelname.upper()}:_ {record.getMessage()}"
headers = {"Authorization": f"Bearer {self.slack_token}"}
try:
if hasattr(record, "file"):
r = requests.post(
"https://slack.com/api/files.upload",
headers=headers,
data={"channels": self.slack_channel, "initial_comment": text},
files={"file": open(record.file, "rb")},
)
else:
r = requests.post(
"https://slack.com/api/chat.postMessage",
headers=headers,
data={"channel": self.slack_channel, "text": text},
)
r.raise_for_status()
except (FileNotFoundError, requests.RequestException) as e:
# self.handleError(e)
raise
return
if not r.json()["ok"]:
# self.handleError(RuntimeError(f"Slack API error: {r.json()['error']}"))
raise RuntimeError(f"Slack API error: {r.json()['error']}")