Skip to content

Commit d25878b

Browse files
Fix O(N²) entity addition in CallbackGroup
The add_* methods (add_timer, add_subscription, etc.) performed a full linear scan to remove expired weak_ptrs on every call. When adding N entities, this resulted in O(N²) total operations. Move expired-entry cleanup from add_* methods into collect_all_ptrs, which already iterates all entries and is called regularly by the executor. The cleanup is done in a single compaction pass during collection, so it adds no extra cost. Entity vectors are marked mutable to allow cleanup within the const collect_all_ptrs method. Fixes #2942 Signed-off-by: Pavel Guzenfeld <me@pavelguzenfeld.com>
1 parent dca23fe commit d25878b

2 files changed

Lines changed: 34 additions & 69 deletions

File tree

rclcpp/include/rclcpp/callback_group.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,11 @@ class CallbackGroup
223223
// Mutex to protect the subsequent vectors of pointers.
224224
mutable std::mutex mutex_;
225225
std::atomic_bool associated_with_executor_;
226-
std::vector<rclcpp::SubscriptionBase::WeakPtr> subscription_ptrs_;
227-
std::vector<rclcpp::TimerBase::WeakPtr> timer_ptrs_;
228-
std::vector<rclcpp::ServiceBase::WeakPtr> service_ptrs_;
229-
std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
230-
std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
226+
mutable std::vector<rclcpp::SubscriptionBase::WeakPtr> subscription_ptrs_;
227+
mutable std::vector<rclcpp::TimerBase::WeakPtr> timer_ptrs_;
228+
mutable std::vector<rclcpp::ServiceBase::WeakPtr> service_ptrs_;
229+
mutable std::vector<rclcpp::ClientBase::WeakPtr> client_ptrs_;
230+
mutable std::vector<rclcpp::Waitable::WeakPtr> waitable_ptrs_;
231231
std::atomic_bool can_be_taken_from_;
232232
const bool automatically_add_to_executor_with_node_;
233233
// defer the creation of the guard condition

rclcpp/src/rclcpp/callback_group.cpp

Lines changed: 29 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ CallbackGroup::type() const
5454
return type_;
5555
}
5656

57+
namespace
58+
{
59+
/// Iterate a vector of weak pointers, call func for each live entry,
60+
/// and compact expired entries out of the vector in a single pass.
61+
template<typename T, typename Func>
62+
void collect_and_compact(
63+
std::vector<typename T::WeakPtr> & ptrs,
64+
const Func & func)
65+
{
66+
size_t write_idx = 0;
67+
for (size_t read_idx = 0; read_idx < ptrs.size(); ++read_idx) {
68+
auto ref_ptr = ptrs[read_idx].lock();
69+
if (ref_ptr) {
70+
func(ref_ptr);
71+
if (write_idx != read_idx) {
72+
ptrs[write_idx] = std::move(ptrs[read_idx]);
73+
}
74+
++write_idx;
75+
}
76+
}
77+
ptrs.resize(write_idx);
78+
}
79+
} // namespace
80+
5781
void CallbackGroup::collect_all_ptrs(
5882
std::function<void(const rclcpp::SubscriptionBase::SharedPtr &)> sub_func,
5983
std::function<void(const rclcpp::ServiceBase::SharedPtr &)> service_func,
@@ -63,40 +87,11 @@ void CallbackGroup::collect_all_ptrs(
6387
{
6488
std::lock_guard<std::mutex> lock(mutex_);
6589

66-
for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) {
67-
rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock();
68-
if (ref_ptr) {
69-
sub_func(ref_ptr);
70-
}
71-
}
72-
73-
for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) {
74-
rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock();
75-
if (ref_ptr) {
76-
service_func(ref_ptr);
77-
}
78-
}
79-
80-
for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) {
81-
rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock();
82-
if (ref_ptr) {
83-
client_func(ref_ptr);
84-
}
85-
}
86-
87-
for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) {
88-
rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock();
89-
if (ref_ptr) {
90-
timer_func(ref_ptr);
91-
}
92-
}
93-
94-
for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) {
95-
rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock();
96-
if (ref_ptr) {
97-
waitable_func(ref_ptr);
98-
}
99-
}
90+
collect_and_compact<rclcpp::SubscriptionBase>(subscription_ptrs_, sub_func);
91+
collect_and_compact<rclcpp::ServiceBase>(service_ptrs_, service_func);
92+
collect_and_compact<rclcpp::ClientBase>(client_ptrs_, client_func);
93+
collect_and_compact<rclcpp::TimerBase>(timer_ptrs_, timer_func);
94+
collect_and_compact<rclcpp::Waitable>(waitable_ptrs_, waitable_func);
10095
}
10196

10297
std::atomic_bool &
@@ -144,64 +139,34 @@ CallbackGroup::add_subscription(
144139
{
145140
std::lock_guard<std::mutex> lock(mutex_);
146141
subscription_ptrs_.push_back(subscription_ptr);
147-
subscription_ptrs_.erase(
148-
std::remove_if(
149-
subscription_ptrs_.begin(),
150-
subscription_ptrs_.end(),
151-
[](rclcpp::SubscriptionBase::WeakPtr x) {return x.expired();}),
152-
subscription_ptrs_.end());
153142
}
154143

155144
void
156145
CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr timer_ptr)
157146
{
158147
std::lock_guard<std::mutex> lock(mutex_);
159148
timer_ptrs_.push_back(timer_ptr);
160-
timer_ptrs_.erase(
161-
std::remove_if(
162-
timer_ptrs_.begin(),
163-
timer_ptrs_.end(),
164-
[](rclcpp::TimerBase::WeakPtr x) {return x.expired();}),
165-
timer_ptrs_.end());
166149
}
167150

168151
void
169152
CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr service_ptr)
170153
{
171154
std::lock_guard<std::mutex> lock(mutex_);
172155
service_ptrs_.push_back(service_ptr);
173-
service_ptrs_.erase(
174-
std::remove_if(
175-
service_ptrs_.begin(),
176-
service_ptrs_.end(),
177-
[](rclcpp::ServiceBase::WeakPtr x) {return x.expired();}),
178-
service_ptrs_.end());
179156
}
180157

181158
void
182159
CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr client_ptr)
183160
{
184161
std::lock_guard<std::mutex> lock(mutex_);
185162
client_ptrs_.push_back(client_ptr);
186-
client_ptrs_.erase(
187-
std::remove_if(
188-
client_ptrs_.begin(),
189-
client_ptrs_.end(),
190-
[](rclcpp::ClientBase::WeakPtr x) {return x.expired();}),
191-
client_ptrs_.end());
192163
}
193164

194165
void
195166
CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr waitable_ptr)
196167
{
197168
std::lock_guard<std::mutex> lock(mutex_);
198169
waitable_ptrs_.push_back(waitable_ptr);
199-
waitable_ptrs_.erase(
200-
std::remove_if(
201-
waitable_ptrs_.begin(),
202-
waitable_ptrs_.end(),
203-
[](rclcpp::Waitable::WeakPtr x) {return x.expired();}),
204-
waitable_ptrs_.end());
205170
}
206171

207172
void

0 commit comments

Comments
 (0)