Skip to content

Commit 4c27183

Browse files
committed
feat: switch StreamOut node to UDP unicast; update synapse-api
BREAKING CHANGE: support UDP unicast in StreamOut node, remove UDP multicast
1 parent 2fccee8 commit 4c27183

10 files changed

Lines changed: 431 additions & 80 deletions

File tree

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
VCPKG_MANIFEST_FEATURES ?= examples\;tests
2+
13
.PHONY: all
24
all: clean configure build
35

examples/stats/main.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ auto configure_stream(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr
5656

5757
NodeConfig stream_out_config;
5858
auto* stream_out_proto = stream_out_config.mutable_stream_out();
59-
stream_out_proto->set_multicast_group("224.0.0.115");
59+
auto* udp_config = stream_out_proto->mutable_udp_unicast();
60+
udp_config->set_destination_address("127.0.0.1");
61+
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);
6062

6163
std::shared_ptr<Node> stream_out_node;
6264
s = StreamOut::from_proto(stream_out_config, &stream_out_node);

examples/stream_out/main.cpp

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <memory>
2+
#include <chrono>
23

34
#include "science/scipp/status.h"
45
#include "science/synapse/channel.h"
@@ -18,14 +19,16 @@ using synapse::NodeType;
1819
using synapse::Signal;
1920
using synapse::StreamOut;
2021
using synapse::SynapseData;
22+
using synapse::NodeConfig;
23+
using synapse::Node;
2124

2225

23-
auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> science::Status {
26+
auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr, const std::string& dest_addr) -> science::Status {
27+
const uint32_t N_CHANNELS = 10;
2428
if (stream_out_ptr == nullptr) {
2529
return { science::StatusCode::kInvalidArgument, "stream out pointer is null" };
2630
}
2731

28-
std::string group = "224.0.0.10";
2932
science::Status s;
3033
DeviceInfo info;
3134
s = device.info(&info);
@@ -39,8 +42,8 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
3942
}
4043
};
4144
auto& electrodes = std::get<Electrodes>(signal.signal);
42-
electrodes.channels.reserve(19);
43-
for (unsigned int i = 0; i < 19; i++) {
45+
electrodes.channels.reserve(N_CHANNELS);
46+
for (unsigned int i = 0; i < N_CHANNELS; i++) {
4447
electrodes.channels.push_back(Ch{
4548
.id = i,
4649
.electrode_id = i * 2,
@@ -50,7 +53,23 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
5053

5154
Config config;
5255
auto broadband_source = std::make_shared<synapse::BroadbandSource>(1, 16, 30000, 20.0, signal);
53-
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
56+
57+
// Create StreamOut with explicit configuration
58+
NodeConfig stream_out_config;
59+
auto* stream_out_proto = stream_out_config.mutable_stream_out();
60+
auto* udp_config = stream_out_proto->mutable_udp_unicast();
61+
udp_config->set_destination_address(dest_addr);
62+
udp_config->set_destination_port(StreamOut::DEFAULT_STREAM_OUT_PORT);
63+
stream_out_proto->set_label("Broadband Stream");
64+
65+
std::shared_ptr<Node> stream_out_node;
66+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
67+
if (!s.ok()) return s;
68+
69+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
70+
if (!*stream_out_ptr) {
71+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
72+
}
5473

5574
s = config.add_node(broadband_source);
5675
if (!s.ok()) return s;
@@ -64,7 +83,10 @@ auto stream_new(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr) -> s
6483
s = device.configure(&config);
6584
if (!s.ok()) return s;
6685

86+
std::cout << "Configured device" << std::endl;
87+
6788
s = device.start();
89+
std::cout << "Started device" << std::endl;
6890

6991
return s;
7092
}
@@ -75,18 +97,17 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
7597
}
7698

7799
science::Status s;
78-
79100
DeviceInfo info;
80101
s = device.info(&info);
81102
if (!s.ok()) return s;
82103

83-
uint32_t stream_out_id = 0; // default id
84-
std::string group;
104+
uint32_t stream_out_id = 0;
105+
NodeConfig stream_out_config;
85106
const auto& nodes = info.configuration().nodes();
86107
for (const auto& node : nodes) {
87108
if (node.type() == NodeType::kStreamOut) {
88109
stream_out_id = node.id();
89-
group = node.stream_out().multicast_group();
110+
stream_out_config = node;
90111
break;
91112
}
92113
}
@@ -95,9 +116,16 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
95116
return { science::StatusCode::kNotFound, "no stream out node found" };
96117
}
97118

98-
std::cout << "found stream out node with id " << stream_out_id << " and group " << group << std::endl;
119+
std::shared_ptr<Node> stream_out_node;
120+
s = StreamOut::from_proto(stream_out_config, &stream_out_node);
121+
if (!s.ok()) return s;
122+
123+
*stream_out_ptr = std::dynamic_pointer_cast<StreamOut>(stream_out_node);
124+
if (!*stream_out_ptr) {
125+
return { science::StatusCode::kInternal, "failed to cast stream out node" };
126+
}
99127

100-
*stream_out_ptr = std::make_shared<synapse::StreamOut>("out", group);
128+
std::cout << "found stream out node with id " << stream_out_id << std::endl;
101129

102130
Config config;
103131
s = config.add_node(*stream_out_ptr, stream_out_id);
@@ -107,21 +135,20 @@ auto stream_existing(Device& device, std::shared_ptr<StreamOut>* stream_out_ptr)
107135
if (!s.ok()) return s;
108136

109137
return s;
110-
}
138+
}
111139

112140
auto stream(const std::string& uri, bool configure) -> int {
113141
synapse::Device device(uri);
114142
science::Status s;
115143

116144
std::shared_ptr<synapse::StreamOut> stream_out;
117145
if (configure) {
118-
s = stream_new(device, &stream_out);
146+
s = stream_new(device, &stream_out, "127.0.0.1");
119147
if (!s.ok()) {
120148
std::cout << "error configuring stream out node: ("
121149
<< static_cast<int>(s.code()) << ") " << s.message() << std::endl;
122150
return 1;
123151
}
124-
125152
} else {
126153
s = stream_existing(device, &stream_out);
127154
if (!s.ok()) {
@@ -136,6 +163,7 @@ auto stream(const std::string& uri, bool configure) -> int {
136163
return 1;
137164
}
138165

166+
std::cout << "Reading..." << std::endl;
139167
while (true) {
140168
SynapseData out;
141169
s = stream_out->read(&out);
@@ -185,16 +213,24 @@ auto stream(const std::string& uri, bool configure) -> int {
185213
std::cout << ss.str() << std::endl;
186214
}
187215
} else {
188-
std::cout << "data type unknown" << std::endl;
216+
std::cout << "received data of unknown type" << std::endl;
189217
}
190218
}
191219

192220
return 1;
193221
}
194222

195223
int main(int argc, char** argv) {
196-
std::string uri = "192.168.0.1:647";
197-
stream(uri, false);
224+
if (argc != 2 && argc != 3) {
225+
std::cout << "Usage: " << argv[0] << " <uri> [--config]" << std::endl;
226+
std::cout << " uri: device URI (e.g., 192.168.0.1:647)" << std::endl;
227+
std::cout << " --config: optional flag to configure a new stream" << std::endl;
228+
std::cout << " if omitted, uses existing stream" << std::endl;
229+
return 1;
230+
}
198231

199-
return 0;
232+
std::string uri = argv[1];
233+
bool configure = (argc == 3 && std::string(argv[2]) == "--config");
234+
235+
return stream(uri, configure);
200236
}

include/science/synapse/nodes/stream_out.h

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@
88
#include "science/libndtp/types.h"
99
#include "science/scipp/status.h"
1010
#include "science/synapse/api/nodes/stream_out.pb.h"
11-
#include "science/synapse/nodes/udp_node.h"
11+
#include "science/synapse/node.h"
1212

1313
namespace synapse {
1414

15-
class StreamOut : public UdpNode {
15+
class StreamOut : public Node {
1616
public:
17-
StreamOut(const std::string& label, const std::string& multicast_group);
17+
static constexpr uint16_t DEFAULT_STREAM_OUT_PORT = 50038;
18+
StreamOut(const std::string& destination_address = "",
19+
uint16_t destination_port = DEFAULT_STREAM_OUT_PORT,
20+
const std::string& label = "");
21+
~StreamOut();
1822

23+
auto init() -> science::Status;
1924
auto read(science::libndtp::SynapseData* out, science::libndtp::NDTPHeader* header = nullptr, size_t* bytes_read = nullptr) -> science::Status;
2025

2126
[[nodiscard]] static auto from_proto(
@@ -27,11 +32,13 @@ class StreamOut : public UdpNode {
2732
auto p_to_proto(synapse::NodeConfig* proto) -> science::Status override;
2833

2934
private:
30-
const std::string label_;
31-
const std::string multicast_group_;
35+
std::string destination_address_;
36+
uint16_t destination_port_;
37+
std::string label_;
38+
int socket_ = 0;
39+
std::optional<sockaddr_in> addr_;
3240

33-
auto init() -> science::Status;
34-
auto get_host(std::string* host) -> science::Status override;
41+
static constexpr uint32_t SOCKET_BUFSIZE_BYTES = 5 * 1024 * 1024; // 5MB
3542
};
3643

3744
} // namespace synapse

src/science/synapse/nodes/broadband_source.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ auto BroadbandSource::p_to_proto(synapse::NodeConfig* proto) -> science::Status
5858
config->set_gain(gain_);
5959

6060
auto s = signal_.to_proto(config->mutable_signal());
61+
std::cout << "BroadbandSource::p_to_proto: " << config->DebugString() << std::endl;
6162

6263
return s;
6364
}

0 commit comments

Comments
 (0)