Skip to content

Commit bb770e9

Browse files
Fix O(N²) entity addition in CallbackGroup
The add_* methods (add_timer, add_subscription, etc.) performed a full linear scan to remove expired weak_ptrs on every call. When adding N entities, this resulted in O(N²) total operations. Move expired-entry cleanup from add_* methods into collect_all_ptrs, which already iterates all entries and is called regularly by the executor. The cleanup is done in a single compaction pass during collection, so it adds no extra cost. Entity vectors are marked mutable to allow cleanup within the const collect_all_ptrs method. Fixes #2942 Signed-off-by: Pavel Guzenfeld <me@pavelguzenfeld.com> Signed-off-by: Pavel Guzenfeld <pavelguzenfeld@gmail.com>
1 parent af78e01 commit bb770e9

2 files changed

Lines changed: 34 additions & 69 deletions

File tree

rclcpp/include/rclcpp/callback_group.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,11 @@ class CallbackGroup
258258
// Mutex to protect the subsequent vectors of pointers.
259259
mutable std::mutex mutex_;
260260
std::atomic_bool associated_with_executor_;
261-
std::vector<rclcpp::SubscriptionBase::WeakPtr> subscription_ptrs_;
262-
std::vector<rclcpp::TimerBase::WeakPtr> timer_ptrs_;
263-
std::vector<rclcpp::ServiceBase::WeakPtr> service_ptrs_;
264-
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
265-
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
261+
mutable std::vector<rclcpp::SubscriptionBase::WeakPtr> subscription_ptrs_;
262+
mutable std::vector<rclcpp::TimerBase::WeakPtr> timer_ptrs_;
263+
mutable std::vector<rclcpp::ServiceBase::WeakPtr> service_ptrs_;
264+
mutable std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
265+
mutable std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
266266
std::atomic_bool can_be_taken_from_;
267267
const bool automatically_add_to_executor_with_node_;
268268
// defer the creation of the guard condition

rclcpp/src/rclcpp/callback_group.cpp

Lines changed: 29 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,30 @@ CallbackGroup::size() const
6868
waitable_ptrs_.size();
6969
}
7070

71+
namespace
72+
{
73+
/// Iterate a vector of weak pointers, call func for each live entry,
74+
/// and compact expired entries out of the vector in a single pass.
75+
template<typename T, typename Func>
76+
void collect_and_compact(
77+
std::vector<typename T::WeakPtr> & ptrs,
78+
const Func & func)
79+
{
80+
size_t write_idx = 0;
81+
for (size_t read_idx = 0; read_idx < ptrs.size(); ++read_idx) {
82+
auto ref_ptr = ptrs[read_idx].lock();
83+
if (ref_ptr) {
84+
func(ref_ptr);
85+
if (write_idx != read_idx) {
86+
ptrs[write_idx] = std::move(ptrs[read_idx]);
87+
}
88+
++write_idx;
89+
}
90+
}
91+
ptrs.resize(write_idx);
92+
}
93+
} // namespace
94+
7195
void CallbackGroup::collect_all_ptrs(
7296
const std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> & sub_func,
7397
const std::function<void(const rclcpp::ServiceBase::SharedPtr &)> & service_func,
@@ -77,40 +101,11 @@ void CallbackGroup::collect_all_ptrs(
77101
{
78102
std::lock_guard<std::mutex> lock(mutex_);
79103

80-
for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) {
81-
rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock();
82-
if (ref_ptr) {
83-
sub_func(ref_ptr);
84-
}
85-
}
86-
87-
for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) {
88-
rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock();
89-
if (ref_ptr) {
90-
service_func(ref_ptr);
91-
}
92-
}
93-
94-
for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) {
95-
rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock();
96-
if (ref_ptr) {
97-
client_func(ref_ptr);
98-
}
99-
}
100-
101-
for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) {
102-
rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock();
103-
if (ref_ptr) {
104-
timer_func(ref_ptr);
105-
}
106-
}
107-
108-
for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) {
109-
rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock();
110-
if (ref_ptr) {
111-
waitable_func(ref_ptr);
112-
}
113-
}
104+
collect_and_compact<rclcpp::SubscriptionBase>(subscription_ptrs_, sub_func);
105+
collect_and_compact<rclcpp::ServiceBase>(service_ptrs_, service_func);
106+
collect_and_compact<rclcpp::ClientBase>(client_ptrs_, client_func);
107+
collect_and_compact<rclcpp::TimerBase>(timer_ptrs_, timer_func);
108+
collect_and_compact<rclcpp::Waitable>(waitable_ptrs_, waitable_func);
114109
}
115110

116111
std::atomic_bool &
@@ -154,64 +149,34 @@ CallbackGroup::add_subscription(
154149
{
155150
std::lock_guard<std::mutex> lock(mutex_);
156151
subscription_ptrs_.push_back(subscription_ptr);
157-
subscription_ptrs_.erase(
158-
std::remove_if(
159-
subscription_ptrs_.begin(),
160-
subscription_ptrs_.end(),
161-
[](const rclcpp::SubscriptionBase::WeakPtr & x) {return x.expired();}),
162-
subscription_ptrs_.end());
163152
}
164153

165154
void
166155
CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr & timer_ptr)
167156
{
168157
std::lock_guard<std::mutex> lock(mutex_);
169158
timer_ptrs_.push_back(timer_ptr);
170-
timer_ptrs_.erase(
171-
std::remove_if(
172-
timer_ptrs_.begin(),
173-
timer_ptrs_.end(),
174-
[](const rclcpp::TimerBase::WeakPtr & x) {return x.expired();}),
175-
timer_ptrs_.end());
176159
}
177160

178161
void
179162
CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr & service_ptr)
180163
{
181164
std::lock_guard<std::mutex> lock(mutex_);
182165
service_ptrs_.push_back(service_ptr);
183-
service_ptrs_.erase(
184-
std::remove_if(
185-
service_ptrs_.begin(),
186-
service_ptrs_.end(),
187-
[](const rclcpp::ServiceBase::WeakPtr & x) {return x.expired();}),
188-
service_ptrs_.end());
189166
}
190167

191168
void
192169
CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr & client_ptr)
193170
{
194171
std::lock_guard<std::mutex> lock(mutex_);
195172
client_ptrs_.push_back(client_ptr);
196-
client_ptrs_.erase(
197-
std::remove_if(
198-
client_ptrs_.begin(),
199-
client_ptrs_.end(),
200-
[](const rclcpp::ClientBase::WeakPtr & x) {return x.expired();}),
201-
client_ptrs_.end());
202173
}
203174

204175
void
205176
CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr & waitable_ptr)
206177
{
207178
std::lock_guard<std::mutex> lock(mutex_);
208179
waitable_ptrs_.push_back(waitable_ptr);
209-
waitable_ptrs_.erase(
210-
std::remove_if(
211-
waitable_ptrs_.begin(),
212-
waitable_ptrs_.end(),
213-
[](const rclcpp::Waitable::WeakPtr & x) {return x.expired();}),
214-
waitable_ptrs_.end());
215180
}
216181

217182
void

0 commit comments

Comments
 (0)