Skip to content

Commit e8b2558

Browse files
committed
Simplify event loop handling and remove deprecated get_event_loop
1 parent 04d7d24 commit e8b2558

17 files changed

Lines changed: 97 additions & 141 deletions

File tree

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ addopts = """
9191
--tb=native -vv --doctest-modules --doctest-glob="*.md" --ignore-glob docs/snippets/*py --benchmark-sort=mean --benchmark-columns="mean, min, max, outliers, ops, rounds"
9292
"""
9393
# https://iscinumpy.gitlab.io/post/bound-version-constraints/#watch-for-warnings
94-
# https://github.com/DiamondLightSource/FastCS/issues/230
9594
filterwarnings = "error"
9695
# Doctest python code in docs, python code in src docstrings, test functions in tests
9796
testpaths = "docs src tests"

src/fastcs/control_system.py

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,9 @@ class FastCS:
2828
loop: Optional event loop to run the control system in
2929
"""
3030

31-
def __init__(
32-
self,
33-
controller: Controller,
34-
transports: Sequence[Transport],
35-
loop: asyncio.AbstractEventLoop | None = None,
36-
):
31+
def __init__(self, controller: Controller, transports: Sequence[Transport]):
3732
self._controller = controller
3833
self._transports = transports
39-
self._loop = loop or asyncio.get_event_loop()
4034

4135
self._scan_coros: list[ScanCallback] = []
4236
self._initial_coros: list[ScanCallback] = []
@@ -47,36 +41,25 @@ def run(self, interactive: bool = True):
4741
"""Run the application
4842
4943
This is a convenience method to call `serve` in a synchronous context.
44+
To use in an async context, call `serve` directly.
5045
5146
Args:
5247
interactive: Whether to create an interactive IPython shell
5348
5449
"""
55-
serve = asyncio.ensure_future(self.serve(interactive=interactive))
5650

57-
if os.name != "nt":
58-
self._loop.add_signal_handler(signal.SIGINT, serve.cancel)
59-
self._loop.add_signal_handler(signal.SIGTERM, serve.cancel)
60-
self._loop.run_until_complete(serve)
51+
async def _serve():
52+
"""Wrapper to add signal handlers and call `serve`"""
53+
loop = asyncio.get_running_loop()
54+
task = asyncio.current_task()
6155

62-
async def _run_initial_coros(self):
63-
for coro in self._initial_coros:
64-
await coro()
56+
if os.name != "nt" and task is not None:
57+
loop.add_signal_handler(signal.SIGINT, task.cancel)
58+
loop.add_signal_handler(signal.SIGTERM, task.cancel)
6559

66-
async def _start_scan_tasks(self):
67-
self._scan_tasks = {self._loop.create_task(coro()) for coro in self._scan_coros}
60+
await self.serve(interactive=interactive)
6861

69-
def _stop_scan_tasks(self):
70-
for task in self._scan_tasks:
71-
if not task.done():
72-
try:
73-
task.cancel()
74-
except (asyncio.CancelledError, RuntimeError):
75-
pass
76-
except Exception as e:
77-
raise RuntimeError("Unhandled exception in stop scan tasks") from e
78-
79-
self._scan_tasks.clear()
62+
asyncio.run(_serve())
8063

8164
async def serve(self, interactive: bool = True) -> None:
8265
"""Serve the control system over the given transports on the current event loop
@@ -110,7 +93,7 @@ async def serve(self, interactive: bool = True) -> None:
11093

11194
coros: list[Coroutine] = []
11295
for transport in self._transports:
113-
transport.connect(controller_api=self.controller_api, loop=self._loop)
96+
transport.connect(controller_api=self.controller_api)
11497
coros.append(transport.serve())
11598
common_context = context.keys() & transport.context.keys()
11699
if common_context:
@@ -153,16 +136,30 @@ async def block_forever():
153136
self._stop_scan_tasks()
154137
await self._controller.disconnect()
155138

139+
async def _run_initial_coros(self):
140+
for coro in self._initial_coros:
141+
await coro()
142+
143+
async def _start_scan_tasks(self):
144+
self._scan_tasks = {asyncio.create_task(coro()) for coro in self._scan_coros}
145+
156146
async def _interactive_shell(self, context: dict[str, Any]):
157147
"""Spawn interactive shell in another thread and wait for it to complete."""
148+
loop = asyncio.get_running_loop()
158149

159150
def run(coro: Coroutine[None, None, None]):
160151
"""Run coroutine on FastCS event loop from IPython thread."""
161152

162153
def wrapper():
163-
asyncio.create_task(coro)
154+
task = asyncio.create_task(coro)
155+
156+
def _log_exception(t: asyncio.Task):
157+
if not t.cancelled() and (exc := t.exception()):
158+
logger.exception("`run` task raised exception", exc_info=exc)
164159

165-
self._loop.call_soon_threadsafe(wrapper)
160+
task.add_done_callback(_log_exception)
161+
162+
loop.call_soon_threadsafe(wrapper)
166163

167164
async def interactive_shell(
168165
context: dict[str, object], stop_event: asyncio.Event
@@ -176,8 +173,24 @@ async def interactive_shell(
176173
context["run"] = run
177174

178175
stop_event = asyncio.Event()
179-
self._loop.create_task(interactive_shell(context, stop_event))
176+
shell_task = asyncio.create_task(interactive_shell(context, stop_event))
177+
180178
await stop_event.wait()
181179

180+
if not shell_task.cancelled() and (exc := shell_task.exception()):
181+
logger.exception("Interactive shell raised exception", exc_info=exc)
182+
183+
def _stop_scan_tasks(self):
184+
for task in self._scan_tasks:
185+
if not task.done():
186+
try:
187+
task.cancel()
188+
except (asyncio.CancelledError, RuntimeError):
189+
pass
190+
except Exception as e:
191+
raise RuntimeError("Unhandled exception in stop scan tasks") from e
192+
193+
self._scan_tasks.clear()
194+
182195
def __del__(self):
183196
self._stop_scan_tasks()

src/fastcs/launch.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
import inspect
32
import json
43
from pathlib import Path
@@ -158,9 +157,7 @@ def run(
158157
else:
159158
controller = controller_class()
160159

161-
instance = FastCS(
162-
controller, instance_options.transport, loop=asyncio.get_event_loop()
163-
)
160+
instance = FastCS(controller, instance_options.transport)
164161

165162
instance.run()
166163

src/fastcs/transports/epics/ca/ioc.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,8 @@ def __init__(
4141
_create_and_link_attribute_pvs(pv_prefix, controller_api)
4242
_create_and_link_command_pvs(pv_prefix, controller_api)
4343

44-
def run(
45-
self,
46-
loop: asyncio.AbstractEventLoop,
47-
) -> None:
48-
dispatcher = AsyncioDispatcher(loop) # Needs running loop
44+
def run(self) -> None:
45+
dispatcher = AsyncioDispatcher(asyncio.get_running_loop())
4946
builder.LoadDatabase()
5047
softioc.iocInit(dispatcher)
5148

src/fastcs/transports/epics/ca/transport.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from dataclasses import dataclass, field
32
from typing import Any
43

@@ -28,13 +27,8 @@ class EpicsCATransport(Transport):
2827
gui: EpicsGUIOptions | None = None
2928
"""Options for the GUI. If not set, no GUI will be created."""
3029

31-
def connect(
32-
self,
33-
controller_api: ControllerAPI,
34-
loop: asyncio.AbstractEventLoop,
35-
) -> None:
30+
def connect(self, controller_api: ControllerAPI) -> None:
3631
self._controller_api = controller_api
37-
self._loop = loop
3832
self._pv_prefix = self.epicsca.pv_prefix
3933
self._ioc = EpicsCAIOC(self.epicsca.pv_prefix, controller_api)
4034

@@ -47,7 +41,7 @@ def connect(
4741
async def serve(self) -> None:
4842
"""Serve `ControllerAPI` over EPICS Channel Access"""
4943
logger.info("Running IOC", pv_prefix=self._pv_prefix)
50-
self._ioc.run(self._loop)
44+
self._ioc.run()
5145

5246
@property
5347
def context(self) -> dict[str, Any]:

src/fastcs/transports/epics/pva/transport.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
1-
import asyncio
21
from dataclasses import dataclass, field
32

43
from fastcs.controllers import ControllerAPI
54
from fastcs.logging import logger
6-
from fastcs.transports.epics import (
7-
EpicsDocsOptions,
8-
EpicsGUIOptions,
9-
EpicsIOCOptions,
10-
)
5+
from fastcs.transports.epics import EpicsDocsOptions, EpicsGUIOptions, EpicsIOCOptions
116
from fastcs.transports.epics.docs import EpicsDocs
127
from fastcs.transports.epics.pva.gui import PvaEpicsGUI
138
from fastcs.transports.transport import Transport
@@ -23,11 +18,7 @@ class EpicsPVATransport(Transport):
2318
docs: EpicsDocsOptions | None = None
2419
gui: EpicsGUIOptions | None = None
2520

26-
def connect(
27-
self,
28-
controller_api: ControllerAPI,
29-
loop: asyncio.AbstractEventLoop,
30-
) -> None:
21+
def connect(self, controller_api: ControllerAPI) -> None:
3122
self._controller_api = controller_api
3223
self._pv_prefix = self.epicspva.pv_prefix
3324
self._ioc = P4PIOC(self.epicspva.pv_prefix, controller_api)

src/fastcs/transports/graphql/transport.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from dataclasses import dataclass, field
32

43
from fastcs.controllers import ControllerAPI
@@ -14,11 +13,7 @@ class GraphQLTransport(Transport):
1413

1514
graphql: GraphQLServerOptions = field(default_factory=GraphQLServerOptions)
1615

17-
def connect(
18-
self,
19-
controller_api: ControllerAPI,
20-
loop: asyncio.AbstractEventLoop,
21-
):
16+
def connect(self, controller_api: ControllerAPI):
2217
self._server = GraphQLServer(controller_api)
2318

2419
async def serve(self) -> None:

src/fastcs/transports/rest/transport.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from dataclasses import dataclass, field
32

43
from fastcs.controllers import ControllerAPI
@@ -14,11 +13,7 @@ class RestTransport(Transport):
1413

1514
rest: RestServerOptions = field(default_factory=RestServerOptions)
1615

17-
def connect(
18-
self,
19-
controller_api: ControllerAPI,
20-
loop: asyncio.AbstractEventLoop,
21-
):
16+
def connect(self, controller_api: ControllerAPI):
2217
self._server = RestServer(controller_api)
2318

2419
async def serve(self) -> None:

src/fastcs/transports/tango/transport.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,9 @@ class TangoTransport(Transport):
1313

1414
tango: TangoDSROptions = field(default_factory=TangoDSROptions)
1515

16-
def connect(
17-
self,
18-
controller_api: ControllerAPI,
19-
loop: asyncio.AbstractEventLoop,
20-
):
21-
self._dsr = TangoDSR(controller_api, loop)
16+
def connect(self, controller_api: ControllerAPI):
17+
self._controller_api = controller_api
2218

2319
async def serve(self) -> None:
24-
coro = asyncio.to_thread(self._dsr.run, self.tango)
25-
await coro
20+
self._dsr = TangoDSR(self._controller_api, asyncio.get_running_loop())
21+
await asyncio.to_thread(self._dsr.run, self.tango)

src/fastcs/transports/transport.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from abc import abstractmethod
32
from dataclasses import dataclass
43
from typing import Any, ClassVar, Union
@@ -8,8 +7,7 @@
87

98
@dataclass
109
class Transport:
11-
"""A base class for transport's implementation
12-
so it can be used in FastCS."""
10+
"""A base class for transport's implementation so it can be used in FastCS."""
1311

1412
subclasses: ClassVar[list[type["Transport"]]] = []
1513

@@ -24,13 +22,12 @@ def union(cls):
2422
return Union[tuple(cls.subclasses)] # noqa: UP007
2523

2624
@abstractmethod
27-
def connect(
28-
self, controller_api: ControllerAPI, loop: asyncio.AbstractEventLoop
29-
) -> None:
30-
"""Connect the ``Transport`` to the control system
25+
def connect(self, controller_api: ControllerAPI) -> None:
26+
"""Connect the `Transport` to the control system
3127
32-
The `ControllerAPI` should be exposed over the transport. The provided event
33-
loop should be used where required instead of creating a new one.
28+
The `ControllerAPI` should be exposed over the transport. Transports that
29+
require the event loop should retrieve it with `asyncio.get_running_loop`,
30+
as this method is called from within an async context.
3431
3532
"""
3633
pass

0 commit comments

Comments
 (0)