Skip to content

Commit fc9f357

Browse files
committed
fix: Start streamers only in executor thread
1 parent 0b6944d commit fc9f357

2 files changed

Lines changed: 40 additions & 8 deletions

File tree

include/web_video_server/web_video_server.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,12 @@ class WebVideoServer : public rclcpp::Node
101101

102102
private:
103103
void restream_frames(std::chrono::duration<double> max_age);
104+
void activate_pending_streamers();
104105
void cleanup_inactive_streams();
106+
void resourse_management_timer_callback();
105107

106108
rclcpp::TimerBase::SharedPtr restream_timer_;
107-
rclcpp::TimerBase::SharedPtr cleanup_timer_;
109+
rclcpp::TimerBase::SharedPtr resource_management_timer_;
108110

109111
// Parameters
110112
double publish_rate_;
@@ -118,11 +120,13 @@ class WebVideoServer : public rclcpp::Node
118120
async_web_server_cpp::HttpRequestHandlerGroup handler_group_;
119121

120122
std::vector<std::shared_ptr<StreamerInterface>> streamers_;
123+
std::vector<std::shared_ptr<StreamerInterface>> pending_streamers_;
121124
pluginlib::ClassLoader<StreamerFactoryInterface> streamer_factory_loader_;
122125
std::map<std::string, std::shared_ptr<StreamerFactoryInterface>> streamer_factories_;
123126
pluginlib::ClassLoader<SnapshotStreamerFactoryInterface> snapshot_streamer_factory_loader_;
124127
std::map<std::string, std::shared_ptr<StreamerFactoryInterface>> snapshot_streamer_factories_;
125128
std::mutex streamers_mutex_;
129+
std::mutex pending_mutex_;
126130
};
127131

128132
} // namespace web_video_server

src/web_video_server.cpp

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <chrono>
3535
#include <cstring>
3636
#include <exception>
37+
#include <iterator>
3738
#include <map>
3839
#include <memory>
3940
#include <mutex>
@@ -147,7 +148,9 @@ WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options)
147148
[this]() {restream_frames(1s / publish_rate_);});
148149
}
149150

150-
cleanup_timer_ = create_wall_timer(500ms, [this]() {cleanup_inactive_streams();});
151+
resource_management_timer_ = create_wall_timer(100ms,
152+
[this]() {resourse_management_timer_callback();}
153+
);
151154

152155
server_->run();
153156
}
@@ -166,6 +169,27 @@ void WebVideoServer::restream_frames(std::chrono::duration<double> max_age)
166169
}
167170
}
168171

172+
void WebVideoServer::activate_pending_streamers()
173+
{
174+
std::vector<std::shared_ptr<StreamerInterface>> to_activate;
175+
{
176+
const std::scoped_lock lock(pending_mutex_);
177+
to_activate.swap(pending_streamers_);
178+
}
179+
180+
for (auto & streamer : to_activate) {
181+
streamer->start();
182+
}
183+
184+
if (!to_activate.empty()) {
185+
const std::scoped_lock lock(streamers_mutex_);
186+
streamers_.insert(
187+
streamers_.end(),
188+
std::make_move_iterator(to_activate.begin()),
189+
std::make_move_iterator(to_activate.end()));
190+
}
191+
}
192+
169193
void WebVideoServer::cleanup_inactive_streams()
170194
{
171195
const std::unique_lock lock(streamers_mutex_, std::try_to_lock);
@@ -182,6 +206,12 @@ void WebVideoServer::cleanup_inactive_streams()
182206
}
183207
}
184208

209+
void WebVideoServer::resourse_management_timer_callback()
210+
{
211+
activate_pending_streamers();
212+
cleanup_inactive_streams();
213+
}
214+
185215
bool WebVideoServer::handle_request(
186216
const async_web_server_cpp::HttpRequest & request,
187217
async_web_server_cpp::HttpConnectionPtr connection, const char * begin,
@@ -208,9 +238,8 @@ bool WebVideoServer::handle_stream(
208238
if (streamer_factories_.find(type) != streamer_factories_.end()) {
209239
const std::shared_ptr<StreamerInterface> streamer = streamer_factories_[type]->create_streamer(
210240
request, connection, weak_from_this());
211-
streamer->start();
212-
const std::scoped_lock lock(streamers_mutex_);
213-
streamers_.push_back(streamer);
241+
const std::scoped_lock lock(pending_mutex_);
242+
pending_streamers_.push_back(streamer);
214243
} else {
215244
async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)(
216245
request, connection, begin, end);
@@ -228,9 +257,8 @@ bool WebVideoServer::handle_snapshot(
228257
const std::shared_ptr<StreamerInterface> streamer =
229258
snapshot_streamer_factories_[type]->create_streamer(
230259
request, connection, weak_from_this());
231-
streamer->start();
232-
const std::scoped_lock lock(streamers_mutex_);
233-
streamers_.push_back(streamer);
260+
const std::scoped_lock lock(pending_mutex_);
261+
pending_streamers_.push_back(streamer);
234262
} else {
235263
async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found)(
236264
request, connection, begin, end);

0 commit comments

Comments
 (0)