Skip to content

Commit ea83518

Browse files
authored
Merge pull request #916 from simvue-io/feature/metric-size-limit
Add Maximum Size for Metrics
2 parents 01747f4 + 79aae2d commit ea83518

9 files changed

Lines changed: 301 additions & 115 deletions

File tree

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
"""
2-
Dispatch
3-
========
1+
"""Dispatch
42
53
Contains factory method for selecting dispatcher type based on Simvue Configuration
64
"""
@@ -20,11 +18,11 @@
2018

2119
def Dispatcher(
2220
mode: typing.Literal["direct", "queued"],
23-
callback: typing.Callable[[list[typing.Any], str, dict[str, typing.Any]], None],
21+
callback: typing.Callable[[list[typing.Any], str], None],
2422
object_types: list[str],
2523
termination_trigger: "Event",
2624
name: str | None = None,
27-
**kwargs,
25+
thresholds: dict[str, int | float] | None = None,
2826
) -> "DispatcherBaseClass":
2927
"""Returns instance of dispatcher based on configuration
3028
@@ -46,6 +44,10 @@ def Dispatcher(
4644
event which triggers termination of the dispatcher
4745
name : str | None, optional
4846
name for the underlying thread, default None
47+
thresholds: dict[str, int | float] | None, default None
48+
if metadata is provided during item addition, specify
49+
thresholds under which dispatch of an item is permitted,
50+
default is None
4951
5052
Returns
5153
-------
@@ -58,7 +60,7 @@ def Dispatcher(
5860
callback=callback,
5961
object_types=object_types,
6062
termination_trigger=termination_trigger,
61-
**kwargs,
63+
thresholds=thresholds,
6264
)
6365
else:
6466
logger.debug("Using queued dispatch for metric and queue sending")
@@ -67,5 +69,5 @@ def Dispatcher(
6769
object_types=object_types,
6870
termination_trigger=termination_trigger,
6971
name=name,
70-
**kwargs,
72+
thresholds=thresholds,
7173
)

simvue/dispatch/base.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import threading
2+
import abc
3+
import typing
4+
5+
from simvue.exception import ObjectDispatchError
6+
7+
8+
class DispatcherBaseClass(abc.ABC):
9+
"""Base class to all dispatchers.
10+
11+
A dispatcher is an object which sends data to a location,
12+
in this case it executes a callback based on criteria.
13+
"""
14+
15+
def __init__(
16+
self,
17+
*,
18+
callback: typing.Callable[[list[typing.Any], str], None],
19+
object_types: list[str],
20+
termination_trigger: threading.Event,
21+
thresholds: dict[str, int | float] | None = None,
22+
) -> None:
23+
"""Initialise a dispatcher.
24+
25+
Parameters
26+
----------
27+
callback : Callable[[list[Any]], str] | None
28+
callback to execute on data.
29+
object_types : list[str]
30+
categories of items for separate handling
31+
termination_trigger : Event
32+
trigger for closing this dispatcher
33+
thresholds : dict[str, int | float] | None, optional
34+
any additional thresholds to consider when handling items.
35+
This assumes metadata defining the values to compare to
36+
such thresholds is included when appending.
37+
"""
38+
super().__init__()
39+
self._thresholds: dict[str, int | float] = thresholds or {}
40+
self._object_types: list[str] = object_types
41+
self._termination_trigger = termination_trigger
42+
self._callback = callback
43+
44+
def add_item(
45+
self,
46+
item: typing.Any,
47+
*,
48+
object_type: str,
49+
metadata: dict[str, int | float] | None = None,
50+
**__,
51+
) -> None:
52+
"""Add an item to the dispatcher.
53+
54+
Parameters
55+
----------
56+
item : Any
57+
item to add to dispatch
58+
object_type : str
59+
category of item
60+
metadata : dict[str, int | float] | None, optional
61+
additional metadata relating to the item to be
62+
used for threshold comparisons
63+
"""
64+
_ = item
65+
_ = object_type
66+
if not metadata:
67+
return
68+
for key, threshold in self._thresholds.items():
69+
if key in metadata and metadata[key] > threshold:
70+
raise ObjectDispatchError(
71+
label=key, threshold=threshold, value=metadata[key]
72+
)
73+
74+
@abc.abstractmethod
75+
def run(self) -> None:
76+
"""Start the dispatcher."""
77+
pass
78+
79+
@abc.abstractmethod
80+
def start(self) -> None:
81+
"""Not used, this allows the class to be similar to a thread."""
82+
pass
83+
84+
@abc.abstractmethod
85+
def join(self) -> None:
86+
"""Not used, this allows the class to be similar to a thread."""
87+
pass
88+
89+
@abc.abstractmethod
90+
def purge(self) -> None:
91+
"""Clear the dispatcher of items."""
92+
pass
93+
94+
@abc.abstractmethod
95+
def is_alive(self) -> bool:
96+
"""Whether the dispatcher is operating correctly."""
97+
pass
98+
99+
@property
100+
@abc.abstractmethod
101+
def empty(self) -> bool:
102+
"""Whether the dispatcher is empty."""
103+
pass
Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ class DirectDispatcher(DispatcherBaseClass):
99

1010
def __init__(
1111
self,
12+
*,
1213
callback: typing.Callable[[list[typing.Any], str], None],
1314
object_types: list[str],
1415
termination_trigger: threading.Event,
15-
**_,
16+
thresholds: dict[str, int | float] | None = None,
1617
) -> None:
1718
"""Initialise a new DirectDispatcher instance
1819
@@ -24,15 +25,28 @@ def __init__(
2425
categories, this is mainly used for creation of queues in a QueueDispatcher
2526
termination_trigger : Event
2627
event which triggers termination of the dispatcher
28+
thresholds: int | float
29+
if metadata is provided during item addition, specify
30+
thresholds under which dispatch of an item is permitted,
31+
default is None
2732
"""
2833
super().__init__(
2934
callback=callback,
3035
object_types=object_types,
3136
termination_trigger=termination_trigger,
37+
thresholds=thresholds,
3238
)
3339

34-
def add_item(self, item: typing.Any, object_type: str, *_, **__) -> None:
40+
def add_item(
41+
self,
42+
item: typing.Any,
43+
*,
44+
object_type: str,
45+
metadata: dict[str, int | float] | None = None,
46+
**__,
47+
) -> None:
3548
"""Execute callback on the given item"""
49+
super().add_item(item, object_type=object_type, metadata=metadata)
3650
self._callback([item], object_type)
3751

3852
def run(self) -> None:
Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@ class QueuedDispatcher(threading.Thread, DispatcherBaseClass):
3333

3434
def __init__(
3535
self,
36+
*,
3637
callback: typing.Callable[[list[typing.Any], str], None],
3738
object_types: list[str],
3839
termination_trigger: threading.Event,
3940
name: str | None = None,
4041
max_buffer_size: int = MAX_BUFFER_SIZE,
4142
max_read_rate: float = MAX_REQUESTS_PER_SECOND,
43+
thresholds: dict[str, int | float] | None = None,
4244
) -> None:
4345
"""
4446
Initialise a new queue based dispatcher
@@ -58,34 +60,47 @@ def __init__(
5860
maximum number of items allowed in created buffer.
5961
max_read_rate : float
6062
maximum rate at which the callback can be executed
63+
thresholds: dict[str, int | float] | None, optional
64+
if metadata is provided during item addition, specify
65+
thresholds within which a single dispatch is permitted,
66+
default is None
6167
"""
6268
DispatcherBaseClass.__init__(
6369
self,
6470
callback=callback,
6571
object_types=object_types,
6672
termination_trigger=termination_trigger,
73+
thresholds=thresholds,
6774
)
6875
super().__init__(name=name, daemon=True)
6976

70-
self._termination_trigger = termination_trigger
71-
self._callback = callback
72-
self._queues = {label: queue.Queue() for label in object_types}
73-
self._max_read_rate = max_read_rate
74-
self._max_buffer_size = max_buffer_size
75-
self._send_timer = 0
77+
self._termination_trigger: threading.Event = termination_trigger
78+
self._callback: typing.Callable[[list[typing.Any], str], None] = callback
79+
self._queues: dict[str, queue.Queue[typing.Any]] = {
80+
label: queue.Queue() for label in object_types
81+
}
82+
self._max_read_rate: float = max_read_rate
83+
self._max_buffer_size: int = max_buffer_size
84+
self._send_timer: int = 0
7685

7786
def add_item(
78-
self, item: typing.Any, object_type: str, blocking: bool = True
87+
self,
88+
item: typing.Any,
89+
*,
90+
object_type: str,
91+
blocking: bool = True,
92+
metadata: dict[str, int | float] | None = None,
7993
) -> None:
8094
"""Add an item to the specified queue with/without blocking"""
95+
super().add_item(item, object_type=object_type, metadata=metadata)
8196
if self._termination_trigger.is_set():
8297
raise RuntimeError(
8398
f"Cannot append item '{item}' to queue '{object_type}', "
84-
"termination called."
99+
+ "termination called."
85100
)
86101
if object_type not in self._queues:
87102
raise KeyError(f"No queue '{object_type}' found")
88-
self._queues[object_type].put(item, block=blocking)
103+
self._queues[object_type].put((item, metadata or {}), block=blocking)
89104

90105
@property
91106
def empty(self) -> bool:
@@ -111,12 +126,23 @@ def _create_buffer(self, queue_label: str) -> list[typing.Any]:
111126
The length of the buffer is constrained.
112127
"""
113128
_buffer: list[typing.Any] = []
129+
_criteria: dict[str, int | float] = {}
130+
_threshold_totals: dict[str, float] = {k: 0 for k in self._thresholds}
114131

115132
while (
116133
not self._queues[queue_label].empty()
117134
and len(_buffer) < self._max_buffer_size
135+
and all(
136+
_threshold_totals[key] < self._thresholds[key]
137+
for key in _threshold_totals
138+
)
118139
):
119-
_item = self._queues[queue_label].get(block=False)
140+
_item, _metadata = typing.cast(
141+
"tuple[typing.Any, dict[str, int | float]]",
142+
self._queues[queue_label].get(block=False),
143+
)
144+
for key in _threshold_totals:
145+
_threshold_totals[key] += _metadata.get(key, 0)
120146
_buffer.append(_item)
121147
self._queues[queue_label].task_done()
122148

simvue/exception.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,13 @@ def __init__(self, obj_type: str, name: str, extra: str | None = None) -> None:
2121
class SimvueRunError(RuntimeError):
2222
"""A special sub-class of runtime error specifically for Simvue run errors"""
2323

24-
pass
24+
25+
class ObjectDispatchError(Exception):
26+
"""Raised if object dispatch failed due to condition."""
27+
28+
def __init__(self, label: str, threshold: int | float, value: int | float) -> None:
29+
self.msg = (
30+
f"Object dispatch failed, {label} "
31+
+ f"of {value} exceeds maximum permitted value of {threshold}"
32+
)
33+
super().__init__(self.msg)

simvue/factory/__init__.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

simvue/factory/dispatch/base.py

Lines changed: 0 additions & 46 deletions
This file was deleted.

0 commit comments

Comments
 (0)