forked from frequenz-floss/frequenz-dispatch-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_merge_strategies.py
More file actions
69 lines (53 loc) · 2.1 KB
/
_merge_strategies.py
File metadata and controls
69 lines (53 loc) · 2.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# License: MIT
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
"""Different merge strategies for dispatch running state events."""
import logging
from collections.abc import Mapping
from sys import maxsize
from typing import Any
from frequenz.client.dispatch.types import DispatchId
from typing_extensions import override
from ._actor_dispatcher import DispatchActorId
from ._bg_service import MergeStrategy
from ._dispatch import Dispatch
def _hash_positive(args: Any) -> int:
"""Make a positive hash."""
return hash(args) + maxsize + 1
class MergeByType(MergeStrategy):
"""Merge running intervals based on the dispatch type."""
@override
def identity(self, dispatch: Dispatch) -> DispatchActorId:
"""Identity function for the merge criteria."""
return DispatchActorId(_hash_positive(dispatch.type))
@override
def filter(
self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch
) -> bool:
"""Filter dispatches based on the merge strategy.
Keeps start events.
Keeps stop events only if no other dispatches matching the
strategy's criteria are running.
"""
if dispatch.started:
logging.debug("Keeping start event %s", dispatch.id)
return True
other_dispatches_running = any(
existing_dispatch.started
for existing_dispatch in dispatches.values()
if (
self.identity(existing_dispatch) == self.identity(dispatch)
and existing_dispatch.id != dispatch.id
)
)
logging.debug(
"stop event %s because other_dispatches_running=%s",
dispatch.id,
other_dispatches_running,
)
return not other_dispatches_running
class MergeByTypeTarget(MergeByType):
"""Merge running intervals based on the dispatch type and target."""
@override
def identity(self, dispatch: Dispatch) -> DispatchActorId:
"""Identity function for the merge criteria."""
return DispatchActorId(_hash_positive((dispatch.type, tuple(dispatch.target))))