-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbroadband_source.py
More file actions
136 lines (110 loc) · 4.34 KB
/
broadband_source.py
File metadata and controls
136 lines (110 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import asyncio
import random
import time
import zmq
from synapse.api.node_pb2 import NodeType
from synapse.api.nodes.broadband_source_pb2 import BroadbandSourceConfig
from synapse.server.nodes.base import BaseNode
from synapse.api.tap_pb2 import TapConnection, TapType
from synapse.api.datatype_pb2 import BroadbandFrame
from synapse.server.status import Status
from synapse.utils.types import ElectricalBroadbandData
def r_sample(bit_width: int):
return random.randint(0, 2**bit_width - 1)
class BroadbandSource(BaseNode):
def __init__(self, id):
super().__init__(id, NodeType.kBroadbandSource)
self.__config: BroadbandSourceConfig = None
self.zmq_context = None
self.zmq_socket = None
self.seq_number = 0
self.iface_ip = None
self.port = None
def config(self):
c = super().config()
if self.__config:
c.broadband_source.CopyFrom(self.__config)
return c
def configure(
self, config: BroadbandSourceConfig = BroadbandSourceConfig()
) -> Status:
self.__config = config
return Status()
async def run(self):
if not self.__config:
self.logger.error("node not configured")
return
c = self.__config
if not c.HasField("signal") or not c.signal:
self.logger.error("node signal not configured")
return
if not c.signal.HasField("electrode") or not c.signal.electrode:
self.logger.error("node signal electrode not configured")
return
e = c.signal.electrode
if not e.channels:
self.logger.error("node signal electrode channels not configured")
return
if not self.zmq_context:
if not self.iface_ip:
self.logger.error("iface_ip not configured")
return
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.PUB)
self.port = self.zmq_socket.bind_to_random_port(f"tcp://{self.iface_ip}")
channels = e.channels
bit_width = c.bit_width if c.bit_width else 4
sample_rate_hz = c.sample_rate_hz if c.sample_rate_hz else 16000
t_last_ns = time.time_ns()
while self.running:
await asyncio.sleep(0.01)
now = time.time_ns()
elapsed_ns = now - t_last_ns
n_samples = int(sample_rate_hz * elapsed_ns / 1e9)
samples = [[ch.id, [r_sample(bit_width) for _ in range(n_samples)]] for ch in channels]
try:
# for backwards compatibility
data = ElectricalBroadbandData(
bit_width=bit_width,
is_signed=False,
sample_rate=sample_rate_hz,
t0=t_last_ns,
samples=samples
)
await self.emit_data(data)
# send data over tap
for i in range(n_samples):
frame = BroadbandFrame(
timestamp_ns = t_last_ns + int(i * 1e9 / sample_rate_hz),
sequence_number = self.seq_number,
frame_data = [chan_samples[i] for _, chan_samples in samples],
sample_rate_hz = sample_rate_hz,
)
try:
self.zmq_socket.send(frame.SerializeToString())
self.seq_number += 1
except Exception as e:
self.logger.error(f"Error sending data: {e}")
t_last_ns = now
except Exception as e:
print(f"Error sending data: {e}")
def stop(self):
"""Clean up ZMQ resources."""
if self.zmq_socket:
self.zmq_socket.close()
self.zmq_socket = None
if self.zmq_context:
self.zmq_context.destroy()
self.zmq_context = None
return super().stop()
def configure_iface_ip(self, iface_ip):
self.iface_ip = iface_ip
def tap_connections(self):
return [
TapConnection(
name="broadband_source_sim",
endpoint=f"tcp://{self.iface_ip}:{self.port}",
message_type="synapse.BroadbandFrame",
tap_type=TapType.TAP_TYPE_PRODUCER,
)
]