-
Notifications
You must be signed in to change notification settings - Fork 237
Expand file tree
/
Copy pathmock_handler.py
More file actions
154 lines (125 loc) · 4.71 KB
/
mock_handler.py
File metadata and controls
154 lines (125 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""
MockHandler for PSLab.
This module provides a software simulation of the PSLab hardware interface.
It allows developers to test signal processing and communication logic
without physical hardware by injecting custom response data.
"""
import logging
import struct
from typing import Dict, Optional, Union
class MockHandler:
"""
A mock communication handler that simulates the PSLab hardware.
"""
def __init__(self, port: Optional[str] = None):
self.connected = False
self.port = port or "MOCK_PORT"
self.response_map: Dict[bytes, bytes] = {}
self.read_buffer = bytearray()
self.logger = logging.getLogger(__name__)
def open(self, port: Optional[str] = None) -> None:
"""
Simulate opening the connection.
Args:
port (str, optional): The port to connect to. Updates the handler's port if provided.
"""
if port:
self.port = port
self.connected = True
self.logger.info(f"MockHandler connected on {self.port}")
def close(self) -> None:
self.connected = False
self.read_buffer.clear()
self.logger.info("MockHandler disconnected.")
def write(self, data: bytes) -> int:
if not self.connected:
raise RuntimeError("Attempted to write to closed MockHandler.")
# Check for registered responses
match_found = False
for command, response in self.response_map.items():
if data.startswith(command):
self.read_buffer.extend(response)
match_found = True
break
if not match_found:
self.logger.debug(f"Write: {data.hex()} (No specific response mapped)")
return len(data)
def read(self, size: int = 1) -> bytes:
"""
Read bytes from the mock buffer.
Parameters
----------
size : int
Number of bytes to read.
Returns
-------
bytes
The data read from the buffer.
"""
if not self.connected:
raise RuntimeError("Attempted to read from closed MockHandler.")
available = len(self.read_buffer)
if available < size:
self.logger.warning(f"MockHandler: Requested {size} bytes, but only {available} available.")
# Return what we have, then empty the buffer
data = self.read_buffer[:]
self.read_buffer.clear()
return bytes(data)
data = self.read_buffer[:size]
self.read_buffer = self.read_buffer[size:]
return bytes(data)
def get_ack(self) -> int:
"""
Simulate receiving an acknowledgement from the device.
Returns
-------
int
ACK value (0x01).
"""
return 0x01
if len(self.read_buffer) < size:
self.logger.warning(f"MockHandler: Requested {size} bytes, but only {len(self.read_buffer)} available.")
# Return what we have, then empty bytes
data = self.read_buffer[:]
self.read_buffer.clear()
return bytes(data)
data = self.read_buffer[:size]
self.read_buffer = self.read_buffer[size:]
return bytes(data)
def clear_buffer(self) -> None:
self.read_buffer.clear()
# --- Interface Compatibility Methods ---
def send_byte(self, val: Union[int, bytes]) -> None:
"""
Sends a single byte to the device.
Handles both integer (0-255) and single-byte bytes objects.
"""
if isinstance(val, int):
self.write(bytes([val]))
elif isinstance(val, bytes):
self.write(val)
else:
raise TypeError(f"send_byte expects int or bytes, got {type(val)}")
def send_int(self, val: int) -> None:
"""Sends a 16-bit integer to the device."""
self.write(struct.pack('<H', val)) # Little-endian unsigned short
def read_byte(self) -> int:
"""Reads a single byte and returns it as an integer."""
data = self.read(1)
if not data:
return 0
return data[0]
def read_int(self) -> int:
"""Reads two bytes and interprets them as a 16-bit integer."""
data = self.read(2)
if len(data) < 2:
return 0
return struct.unpack('<H', data)[0]
def get_ack(self) -> bool:
"""Simulates receiving an acknowledgement from the device."""
return True
# --- Developer Control Methods ---
def register_response(self, command: bytes, response: bytes) -> None:
self.response_map[command] = response
def inject_data(self, data: bytes) -> None:
self.read_buffer.extend(data)