-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathfan_example.py
More file actions
188 lines (143 loc) · 5.45 KB
/
fan_example.py
File metadata and controls
188 lines (143 loc) · 5.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""SinricPro Fan Example - Ceiling/desk fan control with speed settings."""
import asyncio
import os
from typing import Any
from sinricpro import SinricPro, SinricProFan, SinricProConfig
# Device ID from SinricPro portal
DEVICE_ID = "YOUR_DEVICE_ID_HERE" # Replace with your device ID
# Credentials from SinricPro portal
APP_KEY = os.getenv("SINRICPRO_APP_KEY", "YOUR_APP_KEY_HERE")
APP_SECRET = os.getenv("SINRICPRO_APP_SECRET", "YOUR_APP_SECRET_HERE")
power_state = False # Current power state
fan_speed = 0 # Current fan speed (0-100 or 0-5 for discrete levels)
async def on_power_state(state: bool) -> bool:
"""
Handle power state change requests.
Args:
state: True for ON, False for OFF
Returns:
True if successful, False otherwise
"""
global power_state
print(f"\n[Callback] Power: {'ON' if state else 'OFF'}")
power_state = state
# TODO: Control your physical fan
# Example with GPIO relay:
# GPIO.output(RELAY_PIN, GPIO.HIGH if state else GPIO.LOW)
print(f"[Hardware] Fan turned {'on' if state else 'off'}")
return True
async def on_range_value(speed: int, instance_id: str) -> bool:
"""
Handle fan speed change requests.
Args:
speed: Fan speed level (0-100)
instance_id: Instance ID for multi-instance range control (not used for fan)
Returns:
True if successful, False otherwise
"""
global fan_speed
print(f"\n[Callback] Fan Speed: {speed}")
fan_speed = speed
# TODO: Control your fan speed
# Example 1: PWM control (continuous 0-100)
# pwm.ChangeDutyCycle(speed)
# Example 2: Discrete speed levels (convert 0-100 to 0-5)
# discrete_speed = int(speed / 20) # 0-5 levels
# set_discrete_speed(discrete_speed)
print(f"[Hardware] Fan speed set to {speed}")
return True
async def on_adjust_range_value(speed_delta: int, instance_id: str) -> bool:
"""
Handle relative fan speed adjustment requests.
Args:
speed_delta: Change in speed (-100 to +100)
instance_id: Instance ID for multi-instance range control (not used for fan)
Returns:
True if successful, False otherwise
"""
global fan_speed
new_speed = max(0, min(4, fan_speed + speed_delta))
print(f"\n[Callback] Adjust fan speed by {speed_delta:+d} → {new_speed}")
fan_speed = new_speed
# TODO: Adjust your fan speed
# pwm.ChangeDutyCycle(new_speed)
print(f"[Hardware] Fan speed adjusted to {new_speed}")
return True
async def on_setting(setting: str, value: Any) -> bool:
"""
Handle device setting changes.
Args:
setting: Setting name (e.g., "oscillate", "direction", "timer")
value: Setting value
Returns:
True if successful, False otherwise
"""
print(f"\n[Setting] {setting} = {value}")
return True
async def simulate_physical_control(fan: SinricProFan) -> None:
"""Simulate physical fan speed control and send events."""
await asyncio.sleep(10) # Wait 10 seconds
print("\n[Simulating physical speed dial adjustment to speed 4]")
global fan_speed
fan_speed = 4
# Send speed update event to SinricPro
success = await fan.send_range_value_event(fan_speed)
if success:
print(f"[Event] Fan speed event sent: {fan_speed}")
else:
print("[Event] Failed to send speed event (rate limited or disconnected)")
async def main() -> None:
# Create SinricPro instance and fan device
sinric_pro = SinricPro.get_instance()
fan = SinricProFan(DEVICE_ID)
# Register callbacks
fan.on_power_state(on_power_state)
fan.on_range_value(on_range_value)
fan.on_adjust_range_value(on_adjust_range_value)
fan.on_setting(on_setting)
# Add device to SinricPro
sinric_pro.add(fan)
# Configure connection
config = SinricProConfig(app_key=APP_KEY, app_secret=APP_SECRET)
try:
print("=" * 60)
print("SinricPro Fan Example")
print("=" * 60)
print("\nConnecting to SinricPro...")
await sinric_pro.begin(config)
print("✓ Connected! Fan is ready.")
print("\n" + "=" * 60)
print("Voice Commands:")
print("=" * 60)
print(" • 'Alexa, turn on [device name]'")
print(" • 'Alexa, turn off [device name]'")
print(" • 'Alexa, set [device name] to 50 percent'")
print(" • 'Alexa, increase [device name]'")
print(" • 'Alexa, decrease [device name]'")
print("\n" + "=" * 60)
print("Features:")
print("=" * 60)
print(" • Power State Control: On/Off")
print(" • Fan Speed Control: 0-100 (continuous or discrete)")
print(" • Relative Speed Adjustments: Increase/Decrease")
print(" • Setting Controller: Oscillation, Direction, Timer")
print(" • Push Notifications: Alert user of events")
print(" • Event Reporting: Report physical speed dial changes")
print("\n" + "=" * 60)
print("Press Ctrl+C to exit")
print("=" * 60)
# Start physical control simulation
asyncio.create_task(simulate_physical_control(fan))
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\n\nShutting down...")
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
finally:
await sinric_pro.stop()
print("Disconnected.")
if __name__ == "__main__":
asyncio.run(main())