-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathtwitch_backend.py
More file actions
431 lines (348 loc) · 14.1 KB
/
twitch_backend.py
File metadata and controls
431 lines (348 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs, urlencode
import threading
import requests
from datetime import datetime, timedelta
from collections import deque
from functools import wraps
from time import sleep
from typing import Callable, Any, Optional
from collections.abc import Sequence
from loguru import logger as log
from twitchpy.client import Client
from streamcontroller_plugin_tools import BackendBase
from constants import (
OAUTH_REDIRECT_URI,
OAUTH_PORT,
RATE_LIMIT_CALLS,
RATE_LIMIT_PERIOD,
)
class RateLimiter:
"""Thread-safe rate limiter using a sliding window algorithm.
This decorator limits the number of function calls within a specified time period.
It uses a sliding window approach where old calls outside the time window are
automatically removed, and new calls are blocked if the limit is reached.
Args:
max_calls: Maximum number of calls allowed within the time period
period: Time period in seconds for the rate limit window
Example:
@RateLimiter(max_calls=100, period=60)
def api_call():
# This function will be limited to 100 calls per 60 seconds
pass
"""
def __init__(self, max_calls: int, period: float) -> None:
self.max_calls: int = max_calls
self.period: float = period
self.calls: deque[datetime] = deque()
self.lock: threading.Lock = threading.Lock()
def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
with self.lock:
now = datetime.now()
# Remove calls outside the time window
while (
self.calls and (now - self.calls[0]).total_seconds() > self.period
):
self.calls.popleft()
# Check if we've hit the rate limit
if len(self.calls) >= self.max_calls:
# Calculate how long to wait
oldest_call = self.calls[0]
wait_time = self.period - (now - oldest_call).total_seconds()
if wait_time > 0:
log.warning(
f"Rate limit reached, waiting {wait_time:.2f} seconds"
)
sleep(wait_time)
# Clean up old calls again after waiting
now = datetime.now()
while (
self.calls
and (now - self.calls[0]).total_seconds() > self.period
):
self.calls.popleft()
# Record this call
self.calls.append(datetime.now())
return func(*args, **kwargs)
return wrapper
def make_handler(plugin_backend: "Backend") -> type[BaseHTTPRequestHandler]:
class AuthHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
if not self.path.startswith("/auth"):
self.send_response(201)
return
url_parts = urlparse(self.path)
query_params = parse_qs(url_parts.query)
if "error" in query_params:
message = (
query_params["error_description"]
if "error_description" in query_params
else "Something went wrong!"
)
status = 500
else:
message = "Success! You may now close this browser window"
status = 200
shutdown = threading.Thread(target=self.server.shutdown, daemon=True)
shutdown.start()
self.protocol_version = "HTTP/1.1"
self.send_response(status)
self.send_header("Content-Length", len(message))
self.end_headers()
self.wfile.write(bytes(message, "utf8"))
if status != 200:
plugin_backend.auth_failed()
return
plugin_backend.new_code(query_params["code"][0])
return AuthHandler
class Backend(BackendBase):
"""Backend for Twitch API integration.
Handles authentication, API calls, and rate limiting for Twitch operations.
All API methods are automatically rate-limited to prevent exceeding Twitch's
API limits.
"""
def __init__(self) -> None:
super().__init__()
self.twitch: Optional[Client] = None
self.user_id: Optional[str] = None
self.token_path: Optional[str] = None
self.client_secret: Optional[str] = None
self.client_id: Optional[str] = None
self.httpd: Optional[HTTPServer] = None
self.httpd_thread: Optional[threading.Thread] = None
self.auth_code: Optional[str] = None
self.cached_channels: dict[str, str] = {}
self.rate_limiter: RateLimiter = RateLimiter(
RATE_LIMIT_CALLS, RATE_LIMIT_PERIOD
)
def set_token_path(self, path: str) -> None:
self.token_path = path
def on_disconnect(self, conn: Any) -> None:
if self.httpd is not None:
try:
self.httpd.shutdown()
self.httpd.server_close()
except Exception as ex:
log.error(f"Error shutting down HTTP server: {ex}")
self.httpd = None
self.httpd_thread = None
super().on_disconnect(conn)
def get_channel_id(self, user_name: str) -> Optional[str]:
"""Get Twitch channel ID from username.
Args:
user_name: Twitch username to look up
Returns:
Channel ID if found, None otherwise. Results are cached for performance.
"""
if not user_name:
return None
if user_name in self.cached_channels:
return self.cached_channels[user_name]
@self.rate_limiter
def _get_users() -> Sequence[Any]:
return self.twitch.get_users(None, [user_name])
users = _get_users()
if users:
channel_id = users[0].user_id
self.cached_channels[user_name] = channel_id
return str(channel_id)
return None
def create_clip(self) -> None:
"""Create a clip of the current live stream."""
if not self.twitch:
return
self.validate_auth()
@self.rate_limiter
def _create_clip() -> Any:
return self.twitch.create_clip(self.user_id)
_create_clip()
def create_marker(self) -> None:
if not self.twitch:
return
self.validate_auth()
@self.rate_limiter
def _create_marker() -> Any:
return self.twitch.create_stream_marker(self.user_id)
_create_marker()
def get_viewers(self) -> str:
if not self.twitch:
return ""
self.validate_auth()
@self.rate_limiter
def _get_streams() -> Sequence[Any]:
return self.twitch.get_streams(first=1, user_id=self.user_id)
streams = _get_streams()
if not streams:
return "Not Live"
return str(streams[0].viewer_count)
def toggle_chat_mode(self, mode: str) -> bool:
if not self.twitch:
return False
self.validate_auth()
@self.rate_limiter
def _get_settings() -> Any:
return self.twitch.get_chat_settings(self.user_id, self.user_id)
@self.rate_limiter
def _update_settings(updated_value: bool) -> Any:
return self.twitch.update_chat_settings(
self.user_id, self.user_id, **{mode: updated_value}
)
current = _get_settings()
updated = not getattr(current, mode)
_update_settings(updated)
return updated
def get_chat_settings(self) -> dict[str, bool]:
if not self.twitch:
return {}
self.validate_auth()
@self.rate_limiter
def _get_settings() -> Any:
return self.twitch.get_chat_settings(self.user_id, self.user_id)
current = _get_settings()
return {
"subscriber_mode": current.subscriber_mode,
"follower_mode": current.follower_mode,
"emote_mode": current.emote_mode,
"slow_mode": current.slow_mode,
}
def send_message(self, message: str, user_name: str) -> None:
if not self.twitch:
return
self.validate_auth()
channel_id = self.get_channel_id(user_name) or self.user_id
@self.rate_limiter
def _send_message() -> Any:
return self.twitch.send_chat_message(channel_id, self.user_id, message)
_send_message()
def snooze_ad(self) -> None:
if not self.twitch:
return
self.validate_auth()
@self.rate_limiter
def _snooze_ad() -> Any:
return self.twitch.snooze_next_ad(self.user_id)
_snooze_ad()
def play_ad(self, length: int) -> None:
if not self.twitch:
return
self.validate_auth()
@self.rate_limiter
def _start_commercial() -> Any:
return self.twitch.start_commercial(self.user_id, length)
_start_commercial()
def get_next_ad(self) -> tuple[datetime, int]:
if not self.twitch:
return datetime.now() - timedelta(minutes=1), -1
self.validate_auth()
@self.rate_limiter
def _get_ad_schedule() -> Any:
return self.twitch.get_ad_schedule(self.user_id)
schedule = _get_ad_schedule()
return schedule.next_ad_at, schedule.snooze_count
def send_shoutout(self, target_username: str) -> None:
"""Send a shoutout to the specified user.
Args:
target_username: The username of the broadcaster to shout out
Raises:
Exception: If the user is not found or the shoutout fails
"""
if not self.twitch:
raise Exception("Not authenticated")
self.validate_auth()
# Resolve username to user ID
target_id = self.get_channel_id(target_username)
if not target_id:
raise Exception(f"User '{target_username}' not found")
@self.rate_limiter
def _send_shoutout() -> None:
return self.twitch.send_a_shoutout(
from_broadcaster_id=self.user_id,
to_broadcaster_id=target_id,
moderator_id=self.user_id,
)
_send_shoutout()
def update_client_credentials(self, client_id: str, client_secret: str) -> None:
if None in (client_id, client_secret) or "" in (client_id, client_secret):
return
self.client_id = client_id
self.client_secret = client_secret
params = {
"client_id": client_id,
"redirect_uri": OAUTH_REDIRECT_URI,
"response_type": "code",
"scope": "user:write:chat channel:manage:broadcast moderator:manage:chat_settings clips:edit channel:read:subscriptions channel:edit:commercial channel:manage:ads channel:read:ads moderator:manage:shoutouts",
}
encoded_params = urlencode(params)
# Clean up existing server if it exists
if self.httpd is not None:
try:
self.httpd.shutdown()
self.httpd.server_close()
except Exception as ex:
log.error(f"Error shutting down existing HTTP server: {ex}")
# Create new server
try:
self.httpd = HTTPServer(("localhost", OAUTH_PORT), make_handler(self))
except Exception as ex:
log.error(f"Failed to create HTTP server on port {OAUTH_PORT}: {ex}")
self.auth_failed("Failed to start local authentication server")
return
# Create and start server thread
if not self.httpd_thread or not self.httpd_thread.is_alive():
self.httpd_thread = threading.Thread(
target=self.httpd.serve_forever, daemon=True
)
if not self.httpd_thread.is_alive():
self.httpd_thread.start()
url = f"https://id.twitch.tv/oauth2/authorize?{encoded_params}"
check = requests.get(url, timeout=5)
if check.status_code != 200:
message = (
check.json().get("message") if check.json() else "Incorrect Client ID"
)
self.auth_failed(message)
return
webbrowser.open(f"https://id.twitch.tv/oauth2/authorize?{encoded_params}")
def new_code(self, auth_code: str) -> None:
self.auth_with_code(self.client_id, self.client_secret, auth_code)
def validate_auth(self) -> None:
try:
@self.rate_limiter
def _validate() -> Sequence[Any]:
return self.twitch.get_streams(first=1, user_id=self.user_id)
_ = _validate()
except Exception as ex:
self.auth_with_code(self.client_id, self.client_secret, self.auth_code)
def auth_with_code(
self, client_id: str, client_secret: str, auth_code: str
) -> None:
try:
self.twitch = Client(
client_id=client_id,
client_secret=client_secret,
tokens_path=self.token_path,
redirect_uri=OAUTH_REDIRECT_URI,
authorization_code=auth_code,
)
@self.rate_limiter
def _get_users() -> Sequence[Any]:
return self.twitch.get_users()
users = _get_users()
self.auth_code = auth_code
self.user_id = users[0].user_id
self.client_id = client_id
self.client_secret = client_secret
self.frontend.save_auth_settings(client_id, client_secret, auth_code)
self.frontend.on_auth_callback(True)
except Exception as e:
log.error("failed to authenticate", e)
self.auth_failed()
def auth_failed(self, message: str = "") -> None:
self.user_id = None
self.frontend.on_auth_callback(False, message)
def is_authed(self) -> bool:
return self.user_id != None
backend = Backend()