-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy path_actor.py
More file actions
160 lines (127 loc) · 5.85 KB
/
_actor.py
File metadata and controls
160 lines (127 loc) · 5.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# License: MIT
# Copyright © 2023 Frequenz Energy-as-a-Service GmbH
"""Actor model implementation."""
import abc
import asyncio
import logging
from datetime import timedelta
from .._internal._asyncio import is_loop_running
from ._background_service import BackgroundService
_logger = logging.getLogger(__name__)
class Actor(BackgroundService, abc.ABC):
"""A primitive unit of computation that runs autonomously.
To implement an actor, subclasses must implement the
[`_run()`][frequenz.sdk.actor--the-_run-method] method, which should run the actor's
logic. The [`_run()`][frequenz.sdk.actor--the-_run-method] method is called by the
base class when the actor is started, and is expected to run until the actor is
stopped.
!!! info
Please read the [`actor` module documentation][frequenz.sdk.actor] for more
comprehensive guide on how to use and implement actors properly.
"""
RESTART_DELAY: timedelta = timedelta(seconds=2)
"""The delay to wait between restarts of this actor."""
_restart_limit: int | None = None
"""The number of times actors can be restarted when they are stopped by unhandled exceptions.
If this is bigger than 0 or `None`, the actor will be restarted when there is an
unhanded exception in the `_run()` method.
If `None`, the actor will be restarted an unlimited number of times.
!!! note
This is mostly used for testing purposes and shouldn't be set in production.
"""
def __init__(self, *, name: str | None = None) -> None:
"""Create actor instance.
Args:
name: The name of this background service.
"""
super().__init__(name=name)
self._is_cancelled = False
def start(self) -> None:
"""Start this actor.
If this actor is already running, this method does nothing.
"""
if self.is_running:
return
self._is_cancelled = False
self._tasks.clear()
self._tasks.add(asyncio.create_task(self._run_loop()))
@abc.abstractmethod
async def _run(self) -> None:
"""Run this actor's logic."""
async def _delay_if_restart(self, iteration: int) -> None:
"""Delay the restart of this actor's n'th iteration.
Args:
iteration: The current iteration of the restart.
"""
# NB: I think it makes sense (in the future) to think about deminishing returns
# the longer the actor has been running.
# Not just for the restart-delay but actually for the n_restarts counter as well.
if iteration > 0:
delay = self.RESTART_DELAY.total_seconds()
_logger.info("Actor %s: Waiting %s seconds...", self, delay)
await asyncio.sleep(delay)
async def _run_loop(self) -> None:
"""Run the actor's task continuously, managing restarts, cancellation, and termination.
This method handles the execution of the actor's task, including
restarts for unhandled exceptions, cancellation, or normal termination.
Raises:
asyncio.CancelledError: If the actor's `_run()` method is cancelled.
Exception: If the actor's `_run()` method raises any other exception.
BaseException: If the actor's `_run()` method raises any base exception.
"""
_logger.info("Actor %s: Started.", self)
n_restarts = 0
while True:
try:
await self._delay_if_restart(n_restarts)
await self._run()
_logger.info("Actor %s: _run() returned without error.", self)
except asyncio.CancelledError:
_logger.info("Actor %s: Cancelled.", self)
raise
except Exception: # pylint: disable=broad-except
if not is_loop_running():
_logger.exception(
"Something went wrong, no running event loop,"
" not trying to restart %s again.",
self,
)
raise
if self._is_cancelled:
# If actor was cancelled, but any tasks have failed with an exception
# other than asyncio.CancelledError, those exceptions are combined
# in an ExceptionGroup or BaseExceptionGroup.
# We have to handle that case separately to stop actor instead
# of restarting it.
_logger.exception(
"Actor %s: Raised an unhandled exception during stop.", self
)
break
_logger.exception("Actor %s: Raised an unhandled exception.", self)
limit_str = "∞" if self._restart_limit is None else self._restart_limit
limit_str = f"({n_restarts}/{limit_str})"
if self._restart_limit is None or n_restarts < self._restart_limit:
n_restarts += 1
_logger.info("Actor %s: Restarting %s...", self, limit_str)
continue
_logger.info(
"Actor %s: Maximum restarts attempted %s, bailing out...",
self,
limit_str,
)
raise
except BaseException: # pylint: disable=broad-except
_logger.exception("Actor %s: Raised a BaseException.", self)
raise
break
_logger.info("Actor %s: Stopped.", self)
def cancel(self, msg: str | None = None) -> None:
"""Cancel actor.
Cancelled actor can't be started again.
Args:
msg: The message to be passed to the tasks being cancelled.
"""
if msg is not None:
_logger.info("Actor %s cancelled, reason: %s", self, msg)
self._is_cancelled = True
return super().cancel(msg)