-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy path__init__.py
More file actions
206 lines (174 loc) · 6.31 KB
/
__init__.py
File metadata and controls
206 lines (174 loc) · 6.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Copyright 2024, Sergey Dudanov
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
from collections.abc import AsyncIterator
from typing import Any
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak.exc import BleakDeviceNotFoundError
from bleak.uuids import normalize_uuid_str
from .backends import (
ControlEvent,
FtmsCallback,
SetupEvent,
SetupEventData,
SpinDownEvent,
SpinDownEventData,
UpdateEvent,
UpdateEventData,
)
from .client import DisconnectCallback, FitnessMachine
from .const import FTMS_UUID
from .errors import NotFitnessMachineError
from .machines import get_machine
from .manager import PropertiesManager
from .properties import (
DeviceInfo,
MachineType,
MovementDirection,
SettingRange,
get_machine_type_from_service_data,
)
_LOGGER = logging.getLogger(__name__)
def get_client(
ble_device: BLEDevice,
adv_or_type: AdvertisementData | MachineType,
*,
timeout: float = 2,
on_ftms_event: FtmsCallback | None = None,
on_disconnect: DisconnectCallback | None = None,
**kwargs: Any,
) -> FitnessMachine:
"""
Creates an `FitnessMachine` instance from [Bleak](https://bleak.readthedocs.io/) discovered
information: device and advertisement data. Instead of advertisement data, the `MachineType` can be used.
Parameters:
- `ble_device` - [BLE device](https://bleak.readthedocs.io/en/latest/api/index.html#bleak.backends.device.BLEDevice).
- `adv_or_type` - Service [advertisement data](https://bleak.readthedocs.io/en/latest/backends/index.html#bleak.backends.scanner.AdvertisementData) or `MachineType`.
- `timeout` - Control operation timeout. Defaults to 2.0s.
- `on_ftms_event` - Callback for receiving fitness machine events.
- `on_disconnect` - Disconnection callback.
- `**kwargs` - Additional keyword arguments for backwards compatibility.
Return:
- `FitnessMachine` instance.
"""
adv_data = None
if isinstance(adv_or_type, AdvertisementData):
adv_data = adv_or_type
try:
adv_or_type = get_machine_type_from_service_data(adv_or_type)
except NotFitnessMachineError:
# Fallback: some devices advertise FTMS UUID but omit FTMS service data.
# If FTMS UUID is present, instantiate a placeholder client so we can
# connect and detect the real machine type from GATT characteristics.
# Note: post-connect code will probe notifiable data characteristics
# (2ACD/2ACE/2AD1/2AD2) and switch the type accordingly.
if normalize_uuid_str(FTMS_UUID) in (adv_data.service_uuids or []):
_LOGGER.debug(
"FTMS UUID present but no FTMS service data; proceeding with placeholder client. Actual type will be detected after connect."
)
adv_or_type = MachineType.INDOOR_BIKE
else:
raise
cls = get_machine(adv_or_type)
return cls(
ble_device,
adv_data,
timeout=timeout,
on_ftms_event=on_ftms_event,
on_disconnect=on_disconnect,
kwargs=kwargs,
)
async def discover_ftms_devices(
discover_time: float = 10,
**kwargs: Any,
) -> AsyncIterator[tuple[BLEDevice, MachineType]]:
"""
Discover FTMS devices.
Parameters:
- `discover_time` - Discover time. Defaults to 10s.
- `**kwargs` - Additional keyword arguments for backwards compatibility.
Return:
- `AsyncIterator[tuple[BLEDevice, MachineType]]` async generator of `BLEDevice` and `MachineType` tuples.
"""
devices: set[str] = set()
async with BleakScanner(
service_uuids=[normalize_uuid_str(FTMS_UUID)],
kwargs=kwargs,
) as scanner:
try:
async with asyncio.timeout(discover_time):
async for dev, adv in scanner.advertisement_data():
if dev.address in devices:
continue
try:
machine_type = get_machine_type_from_service_data(adv)
except NotFitnessMachineError:
continue
devices.add(dev.address)
_LOGGER.debug(
" #%d - %s: address='%s', name='%s'",
len(devices),
machine_type.name,
dev.address,
dev.name,
)
yield dev, machine_type
except asyncio.TimeoutError:
pass
async def get_client_from_address(
address: str,
*,
scan_timeout: float = 10,
timeout: float = 2,
on_ftms_event: FtmsCallback | None = None,
on_disconnect: DisconnectCallback | None = None,
**kwargs: Any,
) -> FitnessMachine:
"""
Scans for fitness machine with specified BLE address. On success creates and return an `FitnessMachine` instance.
Parameters:
- `address` - The Bluetooth address of the device on this machine (UUID on macOS).
- `scan_timeout` - Scanning timeout. Defaults to 10.0s.
- `timeout` - Control operation timeout. Defaults to 2.0s.
- `on_ftms_event` - Callback for receiving fitness machine events.
- `on_disconnect` - Disconnection callback.
- `**kwargs` - Additional keyword arguments for backwards compatibility.
Return:
- `FitnessMachine` instance if device found successfully.
"""
async for dev, machine_type in discover_ftms_devices(
scan_timeout, kwargs=kwargs
):
if dev.address.lower() == address.lower():
return get_client(
dev,
machine_type,
timeout=timeout,
on_ftms_event=on_ftms_event,
on_disconnect=on_disconnect,
kwargs=kwargs,
)
raise BleakDeviceNotFoundError(address)
__all__ = [
"discover_ftms_devices",
"get_client",
"get_client_from_address",
"MachineType",
"NotFitnessMachineError",
"UpdateEvent",
"SetupEvent",
"ControlEvent",
"SpinDownEvent",
"FtmsCallback",
"SetupEventData",
"UpdateEventData",
"SpinDownEventData",
"MovementDirection",
"DeviceInfo",
"SettingRange",
"PropertiesManager",
"DisconnectCallback",
]