diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index 85837648e9..8f89251c82 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -183,7 +183,7 @@ class CallbackGroup const std::function & service_func, const std::function & client_func, const std::function & timer_func, - const std::function & waitable_func) const; + const std::function & waitable_func); /// Return a reference to the 'associated with executor' atomic boolean. /** diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index b26a63e650..cb459df30c 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -73,44 +73,69 @@ void CallbackGroup::collect_all_ptrs( const std::function & service_func, const std::function & client_func, const std::function & timer_func, - const std::function & waitable_func) const + const std::function & waitable_func) { std::lock_guard lock(mutex_); - for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) { - rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock(); + for (const auto & weak_ptr : subscription_ptrs_) { + auto ref_ptr = weak_ptr.lock(); if (ref_ptr) { sub_func(ref_ptr); } } + subscription_ptrs_.erase( + std::remove_if( + subscription_ptrs_.begin(), subscription_ptrs_.end(), + [](const auto & w) {return w.expired();}), + subscription_ptrs_.end()); - for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) { - rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock(); + for (const auto & weak_ptr : service_ptrs_) { + auto ref_ptr = weak_ptr.lock(); if (ref_ptr) { service_func(ref_ptr); } } + service_ptrs_.erase( + std::remove_if( + service_ptrs_.begin(), service_ptrs_.end(), + [](const auto & w) {return w.expired();}), + service_ptrs_.end()); - for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) { - rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock(); + for (const auto & weak_ptr : client_ptrs_) { + auto ref_ptr = weak_ptr.lock(); if (ref_ptr) { client_func(ref_ptr); } } + client_ptrs_.erase( + std::remove_if( + client_ptrs_.begin(), client_ptrs_.end(), + [](const auto & w) {return w.expired();}), + client_ptrs_.end()); - for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) { - rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock(); + for (const auto & weak_ptr : timer_ptrs_) { + auto ref_ptr = weak_ptr.lock(); if (ref_ptr) { timer_func(ref_ptr); } } + timer_ptrs_.erase( + std::remove_if( + timer_ptrs_.begin(), timer_ptrs_.end(), + [](const auto & w) {return w.expired();}), + timer_ptrs_.end()); - for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) { - rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock(); + for (const auto & weak_ptr : waitable_ptrs_) { + auto ref_ptr = weak_ptr.lock(); if (ref_ptr) { waitable_func(ref_ptr); } } + waitable_ptrs_.erase( + std::remove_if( + waitable_ptrs_.begin(), waitable_ptrs_.end(), + [](const auto & w) {return w.expired();}), + waitable_ptrs_.end()); } std::atomic_bool & @@ -154,12 +179,6 @@ CallbackGroup::add_subscription( { std::lock_guard lock(mutex_); subscription_ptrs_.push_back(subscription_ptr); - subscription_ptrs_.erase( - std::remove_if( - subscription_ptrs_.begin(), - subscription_ptrs_.end(), - [](const rclcpp::SubscriptionBase::WeakPtr & x) {return x.expired();}), - subscription_ptrs_.end()); } void @@ -167,12 +186,6 @@ CallbackGroup::add_timer(const rclcpp::TimerBase::SharedPtr & timer_ptr) { std::lock_guard lock(mutex_); timer_ptrs_.push_back(timer_ptr); - timer_ptrs_.erase( - std::remove_if( - timer_ptrs_.begin(), - timer_ptrs_.end(), - [](const rclcpp::TimerBase::WeakPtr & x) {return x.expired();}), - timer_ptrs_.end()); } void @@ -180,12 +193,6 @@ CallbackGroup::add_service(const rclcpp::ServiceBase::SharedPtr & service_ptr) { std::lock_guard lock(mutex_); service_ptrs_.push_back(service_ptr); - service_ptrs_.erase( - std::remove_if( - service_ptrs_.begin(), - service_ptrs_.end(), - [](const rclcpp::ServiceBase::WeakPtr & x) {return x.expired();}), - service_ptrs_.end()); } void @@ -193,12 +200,6 @@ CallbackGroup::add_client(const rclcpp::ClientBase::SharedPtr & client_ptr) { std::lock_guard lock(mutex_); client_ptrs_.push_back(client_ptr); - client_ptrs_.erase( - std::remove_if( - client_ptrs_.begin(), - client_ptrs_.end(), - [](const rclcpp::ClientBase::WeakPtr & x) {return x.expired();}), - client_ptrs_.end()); } void @@ -206,12 +207,6 @@ CallbackGroup::add_waitable(const rclcpp::Waitable::SharedPtr & waitable_ptr) { std::lock_guard lock(mutex_); waitable_ptrs_.push_back(waitable_ptr); - waitable_ptrs_.erase( - std::remove_if( - waitable_ptrs_.begin(), - waitable_ptrs_.end(), - [](const rclcpp::Waitable::WeakPtr & x) {return x.expired();}), - waitable_ptrs_.end()); } void diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 5c6469ec69..6ffdde010a 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -36,6 +36,10 @@ if(TARGET test_exceptions) target_link_libraries(test_exceptions ${PROJECT_NAME} mimick) endif() +ament_add_ros_isolated_gtest(test_callback_group_entity_cleanup test_callback_group_entity_cleanup.cpp) +if(TARGET test_callback_group_entity_cleanup) + target_link_libraries(test_callback_group_entity_cleanup ${PROJECT_NAME} ${test_msgs_TARGETS}) +endif() ament_add_ros_isolated_gtest(test_allocator_memory_strategy strategies/test_allocator_memory_strategy.cpp) if(TARGET test_allocator_memory_strategy) target_link_libraries(test_allocator_memory_strategy ${PROJECT_NAME} rcpputils::rcpputils ${test_msgs_TARGETS}) @@ -438,7 +442,6 @@ ament_add_ros_isolated_gtest(test_time_source test_time_source.cpp if(TARGET test_time_source) target_link_libraries(test_time_source ${PROJECT_NAME} rcl::rcl) endif() - ament_add_ros_isolated_gtest(test_utilities test_utilities.cpp APPEND_LIBRARY_DIRS "${append_library_dirs}") ament_add_test_label(test_utilities mimick) diff --git a/rclcpp/test/rclcpp/test_callback_group_entity_cleanup.cpp b/rclcpp/test/rclcpp/test_callback_group_entity_cleanup.cpp new file mode 100644 index 0000000000..09413d41bc --- /dev/null +++ b/rclcpp/test/rclcpp/test_callback_group_entity_cleanup.cpp @@ -0,0 +1,264 @@ +// Copyright 2026 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Integration tests for CallbackGroup entity cleanup. + * + * Verifies that expired weak_ptr entries are properly cleaned up when + * cleanup is deferred to collect_all_ptrs (instead of running on every + * add_* call). + */ + +#include +#include +#include + +#include "gtest/gtest.h" + +#include "rclcpp/rclcpp.hpp" +#include "test_msgs/msg/empty.hpp" +#include "test_msgs/srv/empty.hpp" + +using namespace std::chrono_literals; + +class TestCallbackGroupEntityCleanup : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() override + { + node_ = std::make_shared("test_node", "/test_ns"); + } + + void TearDown() override + { + node_.reset(); + } + + /// Trigger collect_all_ptrs to run cleanup on the given callback group. + void trigger_collection(rclcpp::CallbackGroup::SharedPtr cbg) + { + cbg->collect_all_ptrs( + [](const rclcpp::SubscriptionBase::SharedPtr &) {}, + [](const rclcpp::ServiceBase::SharedPtr &) {}, + [](const rclcpp::ClientBase::SharedPtr &) {}, + [](const rclcpp::TimerBase::SharedPtr &) {}, + [](const rclcpp::Waitable::SharedPtr &) {}); + } + + rclcpp::Node::SharedPtr node_; +}; + +/// After adding and then destroying timers, collect_all_ptrs should +/// compact the expired entries, reducing size(). +TEST_F(TestCallbackGroupEntityCleanup, expired_timers_cleaned_on_collect) +{ + auto cbg = node_->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + // Add 100 timers + std::vector timers; + for (int i = 0; i < 100; ++i) { + timers.push_back(node_->create_wall_timer(1h, []() {}, cbg)); + } + EXPECT_EQ(cbg->size(), 100u); + + // Destroy half the timers (the weak_ptrs in the callback group expire) + timers.erase(timers.begin(), timers.begin() + 50); + + // Before collection, size() still reports 100 (expired entries remain) + EXPECT_EQ(cbg->size(), 100u); + + // Trigger collect_all_ptrs — should compact expired entries + trigger_collection(cbg); + + // After collection, only the 50 live timers should remain + EXPECT_EQ(cbg->size(), 50u); +} + +/// After adding and destroying subscriptions, collect_all_ptrs should +/// compact the expired entries. Each subscription may register +/// additional entities (e.g. event waitables), so we use relative sizes. +TEST_F(TestCallbackGroupEntityCleanup, expired_subscriptions_cleaned_on_collect) +{ + auto cbg = node_->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + std::vector subs; + for (int i = 0; i < 20; ++i) { + rclcpp::SubscriptionOptions opts; + opts.callback_group = cbg; + subs.push_back( + node_->create_subscription( + "topic_" + std::to_string(i), 10, + [](test_msgs::msg::Empty::ConstSharedPtr) {}, opts)); + } + const size_t size_with_all = cbg->size(); + EXPECT_GT(size_with_all, 0u); + + // Destroy all subscriptions — expired entries remain until collection + subs.clear(); + EXPECT_EQ(cbg->size(), size_with_all); + + trigger_collection(cbg); + + EXPECT_EQ(cbg->size(), 0u); // all cleaned up +} + +/// collect_all_ptrs should only yield live entities and skip expired ones. +TEST_F(TestCallbackGroupEntityCleanup, collect_only_yields_live_entities) +{ + auto cbg = node_->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + // Add 10 timers + std::vector timers; + for (int i = 0; i < 10; ++i) { + timers.push_back(node_->create_wall_timer(1h, []() {}, cbg)); + } + + // Destroy odd-indexed timers (0-based: 1, 3, 5, 7, 9) + for (int i = 9; i >= 1; i -= 2) { + timers.erase(timers.begin() + i); + } + ASSERT_EQ(timers.size(), 5u); + + // Collect and count live timers + size_t collected_count = 0; + cbg->collect_all_ptrs( + [](const rclcpp::SubscriptionBase::SharedPtr &) {}, + [](const rclcpp::ServiceBase::SharedPtr &) {}, + [](const rclcpp::ClientBase::SharedPtr &) {}, + [&collected_count](const rclcpp::TimerBase::SharedPtr &) { + ++collected_count; + }, + [](const rclcpp::Waitable::SharedPtr &) {}); + + EXPECT_EQ(collected_count, 5u); + EXPECT_EQ(cbg->size(), 5u); // compacted +} + +/// Adding many entities should not exhibit quadratic slowdown. +/// This test adds N timers and asserts the time is well under the +/// quadratic threshold. +TEST_F(TestCallbackGroupEntityCleanup, add_many_timers_is_not_quadratic) +{ + auto cbg = node_->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + const size_t N = 5000; + std::vector timers; + timers.reserve(N); + + auto start = std::chrono::steady_clock::now(); + for (size_t i = 0; i < N; ++i) { + timers.push_back(node_->create_wall_timer(1h, []() {}, cbg)); + } + auto elapsed = std::chrono::steady_clock::now() - start; + auto ms = std::chrono::duration_cast(elapsed).count(); + + EXPECT_EQ(cbg->size(), N); + + // With O(N²) the old code took ~429ms for 10K timers. + // With O(N) we expect < 100ms for 5K timers even on slow CI. + // Use a generous threshold to avoid flakiness. + EXPECT_LT(ms, 5000) << "Adding " << N << " timers took " << ms + << "ms — possible quadratic regression"; +} + +/// After interleaved add/remove cycles, collect_all_ptrs should +/// report exactly the live entities. +TEST_F(TestCallbackGroupEntityCleanup, interleaved_add_remove_cycles) +{ + auto cbg = node_->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + std::vector timers; + + // Cycle 1: add 20, remove 10 + for (int i = 0; i < 20; ++i) { + timers.push_back(node_->create_wall_timer(1h, []() {}, cbg)); + } + timers.erase(timers.begin(), timers.begin() + 10); + + // Cycle 2: add 30 more + for (int i = 0; i < 30; ++i) { + timers.push_back(node_->create_wall_timer(1h, []() {}, cbg)); + } + + // Cycle 3: remove 15 + timers.erase(timers.begin(), timers.begin() + 15); + + // Trigger cleanup + size_t live = 0; + cbg->collect_all_ptrs( + [](const rclcpp::SubscriptionBase::SharedPtr &) {}, + [](const rclcpp::ServiceBase::SharedPtr &) {}, + [](const rclcpp::ClientBase::SharedPtr &) {}, + [&live](const rclcpp::TimerBase::SharedPtr &) {++live;}, + [](const rclcpp::Waitable::SharedPtr &) {}); + + EXPECT_EQ(live, timers.size()); + EXPECT_EQ(cbg->size(), timers.size()); +} + +/// Mixed entity types: timers + subscriptions, verify independent cleanup. +/// Subscriptions may register extra entities (event waitables), so we +/// track relative sizes. +TEST_F(TestCallbackGroupEntityCleanup, mixed_entity_types_cleanup) +{ + auto cbg = node_->create_callback_group( + rclcpp::CallbackGroupType::MutuallyExclusive); + + // Add timers and subscriptions + std::vector timers; + std::vector subs; + const int N = 10; + for (int i = 0; i < N; ++i) { + timers.push_back(node_->create_wall_timer(1h, []() {}, cbg)); + } + const size_t size_timers_only = cbg->size(); + EXPECT_EQ(size_timers_only, static_cast(N)); + + for (int i = 0; i < N; ++i) { + rclcpp::SubscriptionOptions opts; + opts.callback_group = cbg; + subs.push_back( + node_->create_subscription( + "mix_topic_" + std::to_string(i), 10, + [](test_msgs::msg::Empty::ConstSharedPtr) {}, opts)); + } + const size_t size_both = cbg->size(); + const size_t sub_entities = size_both - size_timers_only; + EXPECT_GT(sub_entities, 0u); + + // Destroy all timers, keep subscriptions + timers.clear(); + trigger_collection(cbg); + EXPECT_EQ(cbg->size(), sub_entities); + + // Destroy all subscriptions + subs.clear(); + trigger_collection(cbg); + EXPECT_EQ(cbg->size(), 0u); +}