-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathmini_ticker.py
More file actions
70 lines (57 loc) · 2.22 KB
/
mini_ticker.py
File metadata and controls
70 lines (57 loc) · 2.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import asyncio
import logging
import time
from typing import Dict, Any
from aster.lib.utils import config_logging
from aster.websocket.client.stream import WebsocketClient as Client
# Set up logging for the client application
config_logging(logging, logging.DEBUG)
logger = logging.getLogger(__name__)
# --- Handlers ---
def message_handler(message: Dict[str, Any]) -> None:
"""
Handles incoming messages from the WebSocket stream.
Args:
message: The parsed message payload (usually JSON).
"""
logger.info(f"Received message: {message}")
# --- Main Application Logic ---
async def main_async():
"""
Asynchronously runs the WebSocket client, subscribes to a stream,
and keeps the connection alive for a set duration.
"""
client = Client()
try:
logger.debug("Starting WebSocket client...")
# Start the client (likely runs in a separate thread/task internally)
client.start()
# Subscribe to a stream
logger.info("Subscribing to mini_ticker stream for BTCUSDT...")
client.mini_ticker(
id=1,
callback=message_handler,
symbol="btcusdt"
)
# Keep the main coroutine alive to allow the client thread/task to run
# Use asyncio.sleep for non-blocking wait instead of time.sleep
logger.info("Waiting for 10 seconds of streaming data...")
await asyncio.sleep(10)
except Exception as e:
logger.error(f"An error occurred during client execution: {e}", exc_info=True)
finally:
# Ensure the client connection is closed cleanly, regardless of errors
logger.debug("Closing WebSocket connection.")
client.stop()
logger.info("Client stopped successfully.")
if __name__ == "__main__":
# The client runs synchronously if not wrapped, but using asyncio.run
# provides a cleaner execution context, especially for cleanup.
try:
asyncio.run(main_async())
except KeyboardInterrupt:
logger.warning("Program interrupted by user.")
except RuntimeError as e:
# Handle the common RuntimeError when asyncio tries to close the loop
if "Event loop is closed" not in str(e):
raise