-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsender.py
More file actions
157 lines (138 loc) · 5.92 KB
/
sender.py
File metadata and controls
157 lines (138 loc) · 5.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Send data to USB-Stick.
Serialize request message and pass data stream to legacy Plugwise USB-Stick
Wait for stick to respond.
When request is accepted by USB-Stick, return the Sequence ID of the session.
process flow
1. Send(request)
1. wait for lock
1. convert (serialize) request message into bytes
1. send data to serial port
1. wait for stick reply (accept, timeout, failed)
1. when accept, return sequence id for response message of node
"""
from __future__ import annotations
from asyncio import Future, Lock, Transport, get_running_loop, timeout
from collections.abc import Callable
import logging
from ..constants import STICK_TIME_OUT
from ..exceptions import StickError
from ..messages.requests import PlugwiseRequest
from ..messages.responses import StickResponse, StickResponseType
from .receiver import StickReceiver
_LOGGER = logging.getLogger(__name__)
class StickSender:
"""Send request messages though USB Stick transport connection."""
def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None:
"""Initialize the Stick Sender class."""
self._loop = get_running_loop()
self._receiver = stick_receiver
self._transport = transport
self._stick_response: Future[StickResponse] | None = None
self._stick_lock = Lock()
self._current_request: None | PlugwiseRequest = None
self._unsubscribe_stick_response: Callable[[], None] | None = None
async def start(self) -> None:
"""Start the sender."""
# Subscribe to ACCEPT stick responses, which contain the seq_id we need.
# Other stick responses are not related to this request.
self._unsubscribe_stick_response = (
await self._receiver.subscribe_to_stick_responses(
self._process_stick_response,
None,
(
StickResponseType.ACCEPT,
StickResponseType.TIMEOUT,
StickResponseType.FAILED,
),
)
)
async def write_request_to_port(self, request: PlugwiseRequest) -> None:
"""Send message to serial port of USB stick."""
if self._transport is None:
raise StickError("USB-Stick transport missing.")
await self._stick_lock.acquire()
self._current_request = request
self._stick_response = self._loop.create_future()
request.add_send_attempt()
_LOGGER.info("Sending %s", request)
# Write message to serial port buffer
serialized_data = request.serialize()
_LOGGER.debug(
"write_request_to_port | Write %s to port as %s", request, serialized_data
)
self._transport.write(serialized_data)
# Don't timeout when no response expected
if not request.no_response:
request.start_response_timeout()
# Wait for USB stick to accept request
try:
async with timeout(STICK_TIME_OUT):
response: StickResponse = await self._stick_response
except TimeoutError:
_LOGGER.warning(
"USB-Stick did not respond within %s seconds after writing %s",
STICK_TIME_OUT,
request,
)
request.assign_error(
BaseException(
StickError(
f"USB-Stick did not respond within {STICK_TIME_OUT} seconds after writing {request}"
)
)
)
except BaseException as exc: # pylint: disable=broad-exception-caught
_LOGGER.warning("Exception for %s: %s", request, exc)
request.assign_error(exc)
else:
_LOGGER.debug(
"write_request_to_port | USB-Stick replied with %s to request %s",
response,
request,
)
if response.response_type == StickResponseType.ACCEPT:
if request.seq_id is not None:
request.assign_error(
BaseException(
StickError(f"USB-Stick failed communication for {request}")
)
)
else:
request.seq_id = response.seq_id
await request.subscribe_to_response(
self._receiver.subscribe_to_stick_responses,
self._receiver.subscribe_to_node_responses,
)
_LOGGER.debug(
"write_request_to_port | request has subscribed : %s", request
)
elif response.response_type == StickResponseType.TIMEOUT:
_LOGGER.warning(
"USB-Stick directly responded with communication timeout for %s",
request,
)
request.assign_error(
BaseException(
StickError(f"USB-Stick responded with timeout for {request}")
)
)
elif response.response_type == StickResponseType.FAILED:
_LOGGER.warning("USB-Stick failed communication for %s", request)
request.assign_error(
BaseException(
StickError(f"USB-Stick failed communication for {request}")
)
)
finally:
self._stick_response.cancel()
self._stick_lock.release()
async def _process_stick_response(self, response: StickResponse) -> None:
"""Process stick response."""
if self._stick_response is None or self._stick_response.done():
return
_LOGGER.debug("Received %s as reply to %s", response, self._current_request)
self._stick_response.set_result(response)
def stop(self) -> None:
"""Stop sender."""
if self._unsubscribe_stick_response is not None:
self._unsubscribe_stick_response()