Skip to content

Commit 9edac68

Browse files
committed
feat: switch StreamOut node to UDP unicast; update synapse-api
BREAKING CHANGE: support UDP unicast in StreamOut node, remove UDP multicast
1 parent 423ebc5 commit 9edac68

10 files changed

Lines changed: 323 additions & 78 deletions

File tree

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
VCPKG_MANIFEST_FEATURES ?= examples\;tests
2+
13
.PHONY: all
24
all: clean configure build
35

examples/stream_out/main.cpp

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,16 @@ using synapse::NodeType;
1818
using synapse::Signal;
1919
using synapse::StreamOut;
2020
using synapse::SynapseData;
21+
using synapse::NodeConfig;
22+
using synapse::Node;
2123

2224

23-
auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> science::Status {
25+
auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr, const std::string& dest_addr) -> science::Status {
26+
const uint32_t N_CHANNELS = 10;
2427
if (stream_out_ptr == nullptr) {
2528
return { science::StatusCode::kInvalidArgument, "stream out pointer is null" };
2629
}
2730

28-
std::string group = "224.0.0.10";
2931
science::Status s;
3032
DeviceInfo info;
3133
s = device.info(&info);
@@ -39,8 +41,8 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
3941
}
4042
};
4143
auto& electrodes = std::get<Electrodes>(signal.signal);
42-
electrodes.channels.reserve(19);
43-
for (unsigned int i = 0; i < 19; i++) {
44+
electrodes.channels.reserve(N_CHANNELS);
45+
for (unsigned int i = 0; i < N_CHANNELS; i++) {
4446
electrodes.channels.push_back(Ch{
4547
.id = i,
4648
.electrode_id = i * 2,
@@ -50,7 +52,23 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
5052

5153
Config config;
5254
auto broadband_source = std::make_shared<synapse::BroadbandSource>(1, 16, 30000, 20.0, signal);
53-
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
55+
56+
// Create StreamOut with explicit configuration
57+
NodeConfig stream_out_config;
58+
auto* stream_out_proto = stream_out_config.mutable_stream_out();
59+
auto* udp_config = stream_out_proto->mutable_udp_unicast();
60+
udp_config->set_destination_address(dest_addr);
61+
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);
62+
stream_out_proto->set_label("Broadband Stream");
63+
64+
std::shared_ptr<Node> stream_out_node;
65+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
66+
if (!s.ok()) return s;
67+
68+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
69+
if (!*stream_out_ptr) {
70+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
71+
}
5472

5573
s = config.add_node(broadband_source);
5674
if (!s.ok()) return s;
@@ -64,7 +82,10 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
6482
s = device.configure(&config);
6583
if (!s.ok()) return s;
6684

85+
std::cout << "Configured device" << std::endl;
86+
6787
s = device.start();
88+
std::cout << "Started device" << std::endl;
6889

6990
return s;
7091
}
@@ -75,18 +96,17 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
7596
}
7697

7798
science::Status s;
78-
7999
DeviceInfo info;
80100
s = device.info(&info);
81101
if (!s.ok()) return s;
82102

83-
uint32_t stream_out_id = 0; // default id
84-
std::string group;
103+
uint32_t stream_out_id = 0;
104+
NodeConfig stream_out_config;
85105
const auto& nodes = info.configuration().nodes();
86106
for (const auto& node : nodes) {
87107
if (node.type() == NodeType::kStreamOut) {
88108
stream_out_id = node.id();
89-
group = node.stream_out().multicast_group();
109+
stream_out_config = node;
90110
break;
91111
}
92112
}
@@ -95,9 +115,16 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
95115
return { science::StatusCode::kNotFound, "no stream out node found" };
96116
}
97117

98-
std::cout << "found stream out node with id " << stream_out_id << " and group " << group << std::endl;
118+
std::shared_ptr<Node> stream_out_node;
119+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
120+
if (!s.ok()) return s;
99121

100-
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
122+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
123+
if (!*stream_out_ptr) {
124+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
125+
}
126+
127+
std::cout << "found stream out node with id " << stream_out_id << std::endl;
101128

102129
Config config;
103130
s = config.add_node(*stream_out_ptr, stream_out_id);
@@ -107,15 +134,15 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
107134
if (!s.ok()) return s;
108135

109136
return s;
110-
}
137+
}
111138

112139
auto stream(const std::string& uri, bool configure) -> int {
113140
synapse::Device device(uri);
114141
science::Status s;
115142

116143
std::shared_ptr<synapse::StreamOut> stream_out;
117144
if (configure) {
118-
s = stream_new(device, &stream_out);
145+
s = stream_new(device, &stream_out, "127.0.0.1");
119146
if (!s.ok()) {
120147
std::cout << "error configuring stream out node: ("
121148
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
@@ -136,6 +163,7 @@ auto stream(const std::string& uri, bool configure) -> int {
136163
return 1;
137164
}
138165

166+
std::cout << "Reading..." << std::endl;
139167
while (true) {
140168
SynapseData out;
141169
s = stream_out->read(&out);
@@ -185,16 +213,24 @@ auto stream(const std::string& uri, bool configure) -> int {
185213
std::cout << ss.str() << std::endl;
186214
}
187215
} else {
188-
std::cout << "data type unknown" << std::endl;
216+
std::cout << "received data of unknown type" << std::endl;
189217
}
190218
}
191219

192220
return 1;
193221
}
194222

195223
int main(int argc, char** argv) {
196-
std::string uri = "192.168.0.1:647";
197-
stream(uri, false);
224+
if (argc != 2 && argc != 3) {
225+
std::cout << "Usage: " << argv[0] << " <uri> [--config]" << std::endl;
226+
std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl;
227+
std::cout << " --config: optional flag to configure a new stream" << std::endl;
228+
std::cout << " if omitted, uses existing stream" << std::endl;
229+
return 1;
230+
}
198231

199-
return 0;
232+
std::string uri = argv[1];
233+
bool configure = (argc == 3 && std::string(argv[2]) == "--config");
234+
235+
return stream(uri, configure);
200236
}

include/science/synapse/nodes/stream_out.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,17 @@
88
#include "science/libndtp/types.h"
99
#include "science/scipp/status.h"
1010
#include "science/synapse/api/nodes/stream_out.pb.h"
11-
#include "science/synapse/nodes/udp_node.h"
11+
#include "science/synapse/node.h"
1212

1313
namespace synapse {
1414

15-
class StreamOut : public UdpNode {
15+
class StreamOut : public Node {
1616
public:
17-
StreamOut(const std::string& label, const std::string& multicast_group);
17+
static constexpr uint16_t DEFAULT_STREAM_OUT_PORT = 50038;
18+
StreamOut(const std::string& destination_address = "",
19+
uint16_t destination_port = DEFAULT_STREAM_OUT_PORT,
20+
const std::string& label = "");
21+
~StreamOut();
1822

1923
auto read(science::libndtp::SynapseData* out) -> science::Status;
2024

@@ -27,11 +31,14 @@ class StreamOut : public UdpNode {
2731
auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override;
2832

2933
private:
30-
const std::string label_;
31-
const std::string multicast_group_;
34+
std::string destination_address_;
35+
uint16_t destination_port_;
36+
std::string label_;
37+
int socket_ = 0;
38+
std::optional<sockaddr_in> addr_;
3239

40+
static constexpr uint32_t SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024; // 5MB
3341
auto init() -> science::Status;
34-
auto get_host(std::string* host) -> science::Status override;
3542
};
3643

3744
} // namespace synapse

src/science/synapse/nodes/broadband_source.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ auto BroadbandSource::p_to_proto(synapse::NodeConfig* proto) -> science::Status
5858
config->set_gain(gain_);
5959

6060
auto s = signal_.to_proto(config->mutable_signal());
61+
std::cout << "BroadbandSource::p_to_proto: " << config->DebugString() << std::endl;
6162

6263
return s;
6364
}

0 commit comments

Comments
 (0)