-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathlogs.py
More file actions
137 lines (108 loc) · 5.02 KB
/
logs.py
File metadata and controls
137 lines (108 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Log-related functions and classes.
This module currently contains code that allows us to filter out logs by invocaton
ID, so that they may be returned when invocations are queried.
"""
from collections.abc import MutableSequence
import logging
from uuid import UUID
from weakref import WeakValueDictionary
from .invocation_contexts import get_invocation_id
from .exceptions import LogConfigurationError, NoInvocationContextError
THING_LOGGER = logging.getLogger("labthings_fastapi.things")
def inject_invocation_id(record: logging.LogRecord) -> bool:
r"""Add the invocation ID to records.
This function adds the current invocation ID to log records. If it is not
available, we set the record's ``invocation_id`` property to `None`\ .
:param record: the `logging.LogRecord` object to modify.
:return: `True` (which signals we should keep every record if this is used
as a filter).
"""
try:
id = get_invocation_id()
record.invocation_id = id
except NoInvocationContextError:
record.invocation_id = None
return True
class DequeByInvocationIDHandler(logging.Handler):
"""A log handler that stores entries in memory."""
def __init__(
self,
level: int = logging.INFO,
) -> None:
"""Set up a log handler that appends messages to a deque.
.. warning::
This log handler does not currently rotate or truncate
the list. It's best to use a `deque` with a finite capacity
to avoid memory leaks.
:param level: sets the level of the logger. For most invocations,
a log level of `logging.INFO` is appropriate.
"""
super().__init__()
self.setLevel(level)
self.destinations = WeakValueDictionary[UUID, MutableSequence]()
self.addFilter(inject_invocation_id)
def add_destination_for_id(self, id: UUID, destination: MutableSequence) -> None:
"""Append logs matching ``id`` to a specified sequence.
:param id: the ``invocation_id`` to match.
:param destination: should specify a deque, to which we will append
each log entry as it comes in. This is assumed to be thread
safe.
"""
self.destinations[id] = destination
def emit(self, record: logging.LogRecord) -> None:
"""Save a log record to the destination deque.
:param record: the `logging.LogRecord` object to add.
"""
id = getattr(record, "invocation_id", None)
if isinstance(id, UUID):
try:
self.destinations[id].append(record)
except KeyError:
pass # If there's no destination for a particular log, ignore it.
def configure_thing_logger(level: int | None = None) -> None:
"""Set up the logger for thing instances.
We always set the logger for thing instances to level INFO by default,
as this is currently used to relay progress to the client.
This function will collect logs on a per-invocation
basis by adding a `.DequeByInvocationIDHandler` to the log. Only one
such handler will be added - subsequent calls are ignored.
Unfortunately, filters must be added to every sub-logger, so globally adding
a filter to add invocation ID is not possible. Instead, we attach a filter to
the handler, which filters all the records that propagate to it (i.e. anything
that starts with ``labthings_fastapi.things``).
:param level: the logging level to use. If not specified, we use INFO.
"""
if level is not None:
THING_LOGGER.setLevel(level)
else:
THING_LOGGER.setLevel(logging.INFO)
if not any(
isinstance(h, DequeByInvocationIDHandler) for h in THING_LOGGER.handlers
):
THING_LOGGER.addHandler(DequeByInvocationIDHandler())
def add_thing_log_destination(
invocation_id: UUID, destination: MutableSequence
) -> None:
"""Append logs matching ``invocation_id`` to a specified sequence.
This instructs a handler on the logger used for `.Thing` instances to append a copy
of the logs generated by that invocation to the specified sequence.
This is primarily used by invocation threads to collect their logs, so they
may be returned when the invocation is queried.
:param invocation_id: the ``invocation_id`` to match.
:param destination: should specify a deque, to which we will append
each log entry as it comes in. This is assumed to be thread
safe.
:raises LogConfigurationError: if there is not exactly one suitable handler.
"""
handlers = [
h for h in THING_LOGGER.handlers if isinstance(h, DequeByInvocationIDHandler)
]
if len(handlers) != 1:
if len(handlers) == 0:
msg = "There is no suitable handler on {THING_LOGGER}."
else:
msg = "There were multiple matching handlers on {THING_LOGGER}, "
msg += "which should not happen: this is a LabThings bug."
raise LogConfigurationError(msg)
handler = handlers[0]
handler.add_destination_for_id(invocation_id, destination)