From 0eaf8dc295b50e526f027b1f4929292fdbaef85b Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Fri, 6 Mar 2026 12:13:25 -0500 Subject: [PATCH 1/3] impl(bigtable): add DynamicChannelPool class --- google/cloud/bigtable/CMakeLists.txt | 2 + .../bigtable/bigtable_client_unit_tests.bzl | 1 + .../bigtable/google_cloud_cpp_bigtable.bzl | 1 + .../cloud/bigtable/internal/channel_usage.h | 18 +- .../bigtable/internal/dynamic_channel_pool.h | 567 ++++++++++++++++++ .../internal/dynamic_channel_pool_test.cc | 353 +++++++++++ 6 files changed, 940 insertions(+), 2 deletions(-) create mode 100644 google/cloud/bigtable/internal/dynamic_channel_pool.h create mode 100644 google/cloud/bigtable/internal/dynamic_channel_pool_test.cc diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 5bd03c98ccacc..597d6bfa741ee 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -184,6 +184,7 @@ add_library( internal/default_row_reader.h internal/defaults.cc internal/defaults.h + internal/dynamic_channel_pool.h internal/endpoint_options.h internal/google_bytes_traits.cc internal/google_bytes_traits.h @@ -453,6 +454,7 @@ if (BUILD_TESTING) internal/data_tracing_connection_test.cc internal/default_row_reader_test.cc internal/defaults_test.cc + internal/dynamic_channel_pool_test.cc internal/google_bytes_traits_test.cc internal/logging_result_set_reader_test.cc internal/metrics_test.cc diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index bb166aa1556dc..2f0613b3c73aa 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -52,6 +52,7 @@ bigtable_client_unit_tests = [ "internal/data_tracing_connection_test.cc", "internal/default_row_reader_test.cc", "internal/defaults_test.cc", + "internal/dynamic_channel_pool_test.cc", "internal/google_bytes_traits_test.cc", "internal/logging_result_set_reader_test.cc", "internal/metrics_test.cc", diff --git a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl index a3e49011a272f..8240eba5a9fd5 100644 --- a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl +++ b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl @@ -90,6 +90,7 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/data_tracing_connection.h", "internal/default_row_reader.h", "internal/defaults.h", + "internal/dynamic_channel_pool.h", "internal/endpoint_options.h", "internal/google_bytes_traits.h", "internal/logging_result_set_reader.h", diff --git a/google/cloud/bigtable/internal/channel_usage.h b/google/cloud/bigtable/internal/channel_usage.h index 5eff34af8aeaf..248750dc6033e 100644 --- a/google/cloud/bigtable/internal/channel_usage.h +++ b/google/cloud/bigtable/internal/channel_usage.h @@ -41,6 +41,17 @@ class ChannelUsage : public std::enable_shared_from_this> { std::make_shared()) : stub_(std::move(stub)), clock_(std::move(clock)) {} + // This constructor is only used in testing. + ChannelUsage(std::shared_ptr stub, std::shared_ptr clock, + int initial_outstanding_rpcs) + : stub_(std::move(stub)), + clock_(std::move(clock)), + outstanding_rpcs_(initial_outstanding_rpcs) { + std::cout << __func__ + << ": initial_outstanding_rpcs=" << initial_outstanding_rpcs + << std::endl; + } + // Computes the weighted average of outstanding RPCs on the channel over the // past 60 seconds. StatusOr average_outstanding_rpcs() { @@ -48,8 +59,9 @@ class ChannelUsage : public std::enable_shared_from_this> { auto constexpr kWindowDuration = std::chrono::seconds(kWindowSeconds); std::scoped_lock lk(mu_); if (!last_refresh_status_.ok()) return last_refresh_status_; - // If there are no measurements then the stub has never been used. - if (measurements_.empty()) return 0; + // If there are no measurements then the stub has never been used. In real + // use this will be 0. In testing we sometimes set an initial value. + if (measurements_.empty()) return outstanding_rpcs_; auto now = clock_->Now(); auto last_time = now; auto window_start = now - kWindowDuration; @@ -114,6 +126,8 @@ class ChannelUsage : public std::enable_shared_from_this> { std::shared_ptr AcquireStub() { std::scoped_lock lk(mu_); + std::cout << __func__ << ": outstanding_rpcs_=" << outstanding_rpcs_ + << std::endl; ++outstanding_rpcs_; auto time = clock_->Now(); measurements_.emplace_back(outstanding_rpcs_, time); diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h new file mode 100644 index 0000000000000..4e7de773cbfba --- /dev/null +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -0,0 +1,567 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H + +#include "google/cloud/bigtable/instance_resource.h" +#include "google/cloud/bigtable/internal/channel_usage.h" +#include "google/cloud/bigtable/internal/connection_refresh_state.h" +#include "google/cloud/bigtable/internal/stub_manager.h" +#include "google/cloud/bigtable/options.h" +#include "google/cloud/completion_queue.h" +#include "google/cloud/internal/clock.h" +#include "google/cloud/internal/random.h" +#include "google/cloud/version.h" +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +// TODO(#16035): Move this struct and Option to bigtable/options.h in the +// experimental namespace when the feature is ready. +struct DynamicChannelPoolSizingPolicy { + // To reduce channel churn, the pool will not add channels more frequently + // than this period. + std::chrono::milliseconds pool_size_increase_cooldown_interval = + std::chrono::seconds(10); + + // Removing unused channels is not as performance critical as adding channels + // to handle a surge in RPC calls. Thus, there are separate cooldown settings + // for each. + std::chrono::milliseconds pool_size_decrease_cooldown_interval = + std::chrono::seconds(120); + + struct DiscreteChannels { + int number; + }; + struct PercentageOfPoolSize { + double percentage; + }; + absl::variant + channels_to_add_per_resize = DiscreteChannels{1}; + + // If the average number of outstanding RPCs is below this threshold, + // the pool size will be decreased. + int minimum_average_outstanding_rpcs_per_channel = 1; + // If the average number of outstanding RPCs is above this threshold, + // the pool size will be increased. + int maximum_average_outstanding_rpcs_per_channel = 25; + + // When channels are removed from the pool, we have to wait until all + // outstanding RPCs on that channel are completed before destroying it. + std::chrono::milliseconds remove_channel_polling_interval = + std::chrono::seconds(30); + + // Limits how large the pool can grow. Default is twice the minimum_pool_size. + std::size_t maximum_channel_pool_size = 0; + + // This is set to the value of GrpcNumChannelsOption. + std::size_t minimum_channel_pool_size = 0; +}; + +struct DynamicChannelPoolSizingPolicyOption { + using Type = DynamicChannelPoolSizingPolicy; +}; + +// +// This class manages a pool of Stubs wrapped in a ChannelUsage object, and +// selects one for use using a "Random Two Least Used" strategy. +// +// Based on usage data from the ChannelUsage object, the pool will add and +// remove ChannelUsage objects per the configuration present in the +// DynamicChannelPoolSizingPolicyOption. +// +template +class DynamicChannelPool + : public std::enable_shared_from_this> { + public: + using StubFactoryFn = + std::function>>( + std::uint32_t id, std::string const& instance_name, + StubManager::Priming priming)>; + + static std::shared_ptr Create( + std::string const& instance_name, CompletionQueue cq, + std::vector>> initial_channels, + std::shared_ptr refresh_state, + StubFactoryFn stub_factory_fn, + DynamicChannelPoolSizingPolicy sizing_policy = {}) { + // This logic should be performed by the StubFactory. + // sizing_policy.minimum_channel_pool_size = initial_channels.size(); + // if (sizing_policy.maximum_channel_pool_size == 0) { + // sizing_policy.maximum_channel_pool_size = + // sizing_policy.minimum_channel_pool_size * 2; + // } + auto pool = std::shared_ptr(new DynamicChannelPool( + std::move(instance_name), std::move(cq), std::move(initial_channels), + std::move(refresh_state), std::move(stub_factory_fn), + std::move(sizing_policy))); + std::cout << __PRETTY_FUNCTION__ << ": return pool size=" << pool->size() + << std::endl; + return pool; + } + + ~DynamicChannelPool() { + std::scoped_lock lk(mu_); + // Eventually the channel refresh chain will terminate after this class is + // destroyed. But only after the timer futures expire on the CompletionQueue + // performing this work. We might as well cancel those timer futures now. + refresh_state_->timers().CancelAll(); + if (remove_channel_poll_timer_.valid()) remove_channel_poll_timer_.cancel(); + if (pool_size_increase_cooldown_timer_.valid()) { + pool_size_increase_cooldown_timer_.cancel(); + } + if (pool_size_decrease_cooldown_timer_.valid()) { + pool_size_decrease_cooldown_timer_.cancel(); + } + } + + // This is a snapshot aka dirty read as the size could immediately change + // after this function returns. + std::size_t size() const { + std::scoped_lock lk(mu_); + return channels_.size(); + } + + // This is a snapshot aka dirty read as the size could immediately change + // after this function returns. + bool empty() const { + std::scoped_lock lk(mu_); + return channels_.empty(); + } + + // This method is for use in testing only. + // + // This method acquires a lock on the mutex member variable and executes fn. + void InstrumentDrainingChannels( + std::function>>&)> fn) { + std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl; + std::scoped_lock lk(mu_); + fn(draining_channels_); + } + + // If the pool is not under a pool resize cooldown, call + // CheckPoolChannelHealth. + // + // Pick two random channels from channels_ and return the channel with the + // lower number of outstanding_rpcs. This is the "quick" path. + // + // If one or both of the random channels have been marked unhealthy after a + // refresh, continue choosing random channels to find a pair of healthy + // channels to compare. Any channels found to be unhealthy are moved from + // channels_ to draining_channels_ and ScheduleRemoveChannels is called. + // + // If there is only one health channel in the pool, use it. + // + // If there are no healthy channels in channels_, create a new channel and + // use that one. Also call ScheduleAddChannels to replenish channels_. + std::shared_ptr> GetChannelRandomTwoLeastUsed() { + std::scoped_lock lk(mu_); + std::cout << __PRETTY_FUNCTION__ << ": channels_size()=" << channels_.size() + << std::endl; + // for (auto const& c : channels_) { + // std::cout << __PRETTY_FUNCTION__ << ": channel=" << c.get() << + // std::endl; + // } + + // // TODO(sdhart): simplify this + if (!pool_size_increase_cooldown_timer_.valid() || + !pool_size_decrease_cooldown_timer_.valid()) { + CheckPoolChannelHealth(lk); + } else if (pool_size_increase_cooldown_timer_.is_ready()) { + (void)pool_size_increase_cooldown_timer_.get(); + CheckPoolChannelHealth(lk); + } else if (pool_size_decrease_cooldown_timer_.is_ready()) { + (void)pool_size_decrease_cooldown_timer_.get(); + CheckPoolChannelHealth(lk); + } + + // std::cout << __PRETTY_FUNCTION__ << ": finished + // CheckPoolChannelHealth" + // << std::endl; + // std::vector< + // typename std::vector>>::iterator> + // iterators; + ChannelSelectionData d; + d.iterators.reserve(channels_.size()); + + for (auto iter = channels_.begin(); iter != channels_.end(); ++iter) { + d.iterators.push_back(iter); + } + std::shuffle(d.iterators.begin(), d.iterators.end(), rng_); + d.shuffle_iter = d.iterators.begin(); + // typename std::vector>>::iterator + // channel_1_iter; + // typename std::vector>>::iterator + // channel_2_iter; + // StatusOr channel_1_rpcs = Status{StatusCode::kNotFound, ""}; + // StatusOr channel_2_rpcs = Status{StatusCode::kNotFound, ""}; + // typename std::vector< + // typename + // std::vector>>::iterator>::iterator + // shuffle_iter = iterators.begin(); + + if (d.shuffle_iter != d.iterators.end()) { + d.channel_1_iter = *d.shuffle_iter; + d.channel_1_rpcs = (*d.channel_1_iter)->average_outstanding_rpcs(); + ++d.shuffle_iter; + } + + if (d.shuffle_iter != d.iterators.end()) { + d.channel_2_iter = *d.shuffle_iter; + d.channel_2_rpcs = (*d.channel_2_iter)->average_outstanding_rpcs(); + } + + // This is the most common case so we try it first. + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": 2 ok channels(" + << *d.channel_1_rpcs << ", " << *d.channel_2_rpcs + << "), returning smaller" << std::endl; + return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } + if (d.iterators.size() == 1 && d.channel_1_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ + << ": pool size is 1, returning OK channel" << std::endl; + return *d.channel_1_iter; + } + if (d.iterators.empty()) { + std::cout << __PRETTY_FUNCTION__ + << ": pool is empty, create and return channel" << std::endl; + channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_, + StubManager::Priming::kNoPriming)); + return channels_.front(); + } + + return HandleBadChannels(lk, d); + } + + private: + DynamicChannelPool( + std::string const& instance_name, CompletionQueue cq, + std::vector>> initial_wrapped_channels, + std::shared_ptr refresh_state, + StubFactoryFn stub_factory_fn, + DynamicChannelPoolSizingPolicy sizing_policy) + : instance_name_(std::move(instance_name)), + cq_(std::move(cq)), + refresh_state_(std::move(refresh_state)), + stub_factory_fn_(std::move(stub_factory_fn)), + channels_(std::move(initial_wrapped_channels)), + sizing_policy_(std::move(sizing_policy)), + next_channel_id_(static_cast(channels_.size())) { + std::scoped_lock lk(mu_); + SetSizeDecreaseCooldownTimer(lk); + } + + struct ChannelSelectionData { + using ChannelSelect = + typename std::vector>>::iterator; + std::vector iterators; + ChannelSelect channel_1_iter; + ChannelSelect channel_2_iter; + StatusOr channel_1_rpcs = Status{StatusCode::kNotFound, ""}; + StatusOr channel_2_rpcs = Status{StatusCode::kNotFound, ""}; + typename std::vector::iterator shuffle_iter; + }; + + std::shared_ptr> HandleBadChannels( + std::scoped_lock const& lk, ChannelSelectionData& d) { + // We have one or more bad channels. Spending time finding a good channel + // will be cheaper than trying to use a bad channel in the long run. + std::vector< + typename std::vector>>::iterator> + bad_channel_iters; + + while (!d.channel_1_rpcs.ok() && d.shuffle_iter != d.iterators.end()) { + bad_channel_iters.push_back(d.channel_1_iter); + ++d.shuffle_iter; + d.channel_1_iter = *d.shuffle_iter; + d.channel_1_rpcs = d.shuffle_iter != d.iterators.end() + ? (*d.channel_1_iter)->average_outstanding_rpcs() + : Status{StatusCode::kNotFound, ""}; + } + + while (!d.channel_2_rpcs.ok() && d.shuffle_iter != d.iterators.end()) { + bad_channel_iters.push_back(d.channel_2_iter); + ++d.shuffle_iter; + d.channel_2_iter = *d.shuffle_iter; + d.channel_2_rpcs = d.shuffle_iter != d.iterators.end() + ? (*d.channel_2_iter)->average_outstanding_rpcs() + : Status{StatusCode::kNotFound, ""}; + } + + EvictBadChannels(lk, bad_channel_iters); + ScheduleRemoveChannels(lk); + + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": 2 ok channels" << std::endl; + return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } + if (d.channel_1_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": ONLY channel_1 ok" << std::endl; + // Schedule repopulating the pool. + ScheduleAddChannels(lk); + return *d.channel_1_iter; + } + if (d.channel_2_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": ONLY channel_2 ok" << std::endl; + // Schedule repopulating the pool. + ScheduleAddChannels(lk); + return *d.channel_2_iter; + } + + // We have no usable channels in the entire pool; this is bad. + // Create a channel immediately to unblock application. + std::cout << __PRETTY_FUNCTION__ << ": NO USABLE CHANNELS" << std::endl; + channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_, + StubManager::Priming::kNoPriming)); + // TODO(sdhart): This swap may not be necessary, investigate. + std::swap(channels_.front(), channels_.back()); + // Schedule repopulating the pool. + ScheduleAddChannels(lk); + return channels_.front(); + } + + struct ChannelAddVisitor { + std::size_t pool_size; + explicit ChannelAddVisitor(std::size_t pool_size) : pool_size(pool_size) {} + std::size_t operator()( + typename DynamicChannelPoolSizingPolicy::DiscreteChannels const& c) { + return c.number; + } + std::size_t operator()( + typename DynamicChannelPoolSizingPolicy::PercentageOfPoolSize const& + c) { + return static_cast( + std::floor(static_cast(pool_size) * c.percentage)); + } + }; + + // Determines the number of channels to add and reserves the channel ids to + // be used. Lastly, it calls CompletionQueue::RunAsync with a callback that + // executes AddChannels with the reserved ids. + void ScheduleAddChannels(std::scoped_lock const&) { + std::size_t num_channels_to_add; + // If we're undersized due to bad channels, get us back to the minimum size. + if (channels_.size() < sizing_policy_.minimum_channel_pool_size) { + num_channels_to_add = + sizing_policy_.minimum_channel_pool_size - channels_.size(); + } else { + num_channels_to_add = + std::min(sizing_policy_.maximum_channel_pool_size - channels_.size(), + absl::visit(ChannelAddVisitor(channels_.size()), + sizing_policy_.channels_to_add_per_resize)); + } + std::vector new_channel_ids; + new_channel_ids.reserve(num_channels_to_add); + for (std::size_t i = 0; i < num_channels_to_add; ++i) { + new_channel_ids.push_back(next_channel_id_++); + } + + std::weak_ptr> foo = this->shared_from_this(); + cq_.RunAsync([new_channel_ids = std::move(new_channel_ids), + weak = std::move(foo)]() { + if (auto self = weak.lock()) { + self->AddChannels(new_channel_ids); + } + }); + } + + // Creates the new channels using the stub_factory_fn and only after that + // locks the mutex to add the new channels. + void AddChannels(std::vector const& new_channel_ids) { + std::vector>> new_stubs; + new_stubs.reserve(new_channel_ids.size()); + for (auto const& id : new_channel_ids) { + auto new_stub = stub_factory_fn_( + id, instance_name_, StubManager::Priming::kSynchronousPriming); + if (new_stub.ok()) new_stubs.push_back(*std::move(new_stub)); + } + std::scoped_lock lk(mu_); + channels_.insert(channels_.end(), + std::make_move_iterator(new_stubs.begin()), + std::make_move_iterator(new_stubs.end())); + } + + // Calls CompletionQueuer::MakeRelativeTimer using + // remove_channel_polling_interval with a callback that executes + // RemoveChannels. + void ScheduleRemoveChannels(std::scoped_lock const&) { + if (remove_channel_poll_timer_.valid()) return; + std::cout << __PRETTY_FUNCTION__ << ": set remove_channel_poll_timer" + << std::endl; + std::weak_ptr> foo = this->shared_from_this(); + remove_channel_poll_timer_ = + cq_.MakeRelativeTimer(sizing_policy_.remove_channel_polling_interval) + .then( + [weak = std::move(foo)]( + future> f) { + if (f.get().ok()) { + if (auto self = weak.lock()) { + self->RemoveChannels(); + } + } + }); + } + + // Locks the mutex, reverse sorts draining_channels_, calling pop_back until + // either draining_channels_ is empty or a channel with outstanding_rpcs is + // encountered. Calls ScheduleRemoveChannels if draining_channels_ is + // non-empty. + void RemoveChannels() { + std::scoped_lock lk(mu_); + std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl; + std::sort(draining_channels_.begin(), draining_channels_.end(), + [](std::shared_ptr> const& a, + std::shared_ptr> b) { + auto rpcs_a = a->instant_outstanding_rpcs(); + auto rpcs_b = b->instant_outstanding_rpcs(); + if (!rpcs_a.ok()) return false; + if (!rpcs_b.ok()) return true; + return *rpcs_a > *rpcs_b; + }); + std::cout << __PRETTY_FUNCTION__ + << ": draining_channels_.size()=" << draining_channels_.size() + << std::endl; + while (!draining_channels_.empty()) { + auto outstanding_rpcs = + draining_channels_.back()->instant_outstanding_rpcs(); + std::cout << __PRETTY_FUNCTION__ + << ": outstanding_rpcs=" << *outstanding_rpcs << std::endl; + if (outstanding_rpcs.ok() && *outstanding_rpcs > 0) { + ScheduleRemoveChannels(lk); + return; + } + std::cout << __PRETTY_FUNCTION__ << ": draining_channels_.pop_back()" + << std::endl; + draining_channels_.pop_back(); + } + // TODO(sdhart): If iterators becomes a member variable perhaps add logic to + // call + // shrink_to_fit on iterators_ if there's a large + // difference between iterators_.capacity and channels_.size + } + + void EvictBadChannels( + std::scoped_lock const&, + std::vector< + typename std::vector>>::iterator>& + bad_channel_iters) { + auto back_iter = channels_.rbegin(); + for (auto& bad_channel_iter : bad_channel_iters) { + bool swapped = false; + while (!swapped) { + auto b = (*back_iter)->instant_outstanding_rpcs(); + if (b.ok()) { + std::swap(*back_iter, *bad_channel_iter); + draining_channels_.push_back(std::move(*back_iter)); + swapped = true; + } + ++back_iter; + } + } + for (std::size_t i = 0; i < bad_channel_iters.size(); ++i) { + channels_.pop_back(); + } + } + + void SetSizeIncreaseCooldownTimer(std::scoped_lock const&) { + pool_size_increase_cooldown_timer_ = cq_.MakeRelativeTimer( + sizing_policy_.pool_size_increase_cooldown_interval); + } + + void SetSizeDecreaseCooldownTimer(std::scoped_lock const&) { + pool_size_decrease_cooldown_timer_ = cq_.MakeRelativeTimer( + sizing_policy_.pool_size_decrease_cooldown_interval); + } + + // Computes the average_rpcs_pre_channel across all channels in the pool, + // excluding any channels that are awaiting removal in draining_channels_. + // The computed average is compared to the thresholds in the sizing policy + // and calls either ScheduleRemoveChannels or ScheduleAddChannels as + // appropriate. If either is called the resize_cooldown_timer is also set. + void CheckPoolChannelHealth(std::scoped_lock const& lk) { + int average_rpc_per_channel = + channels_.empty() + ? 0 + : std::accumulate(channels_.begin(), channels_.end(), 0, + [](int a, auto const& b) { + auto rpcs_b = b->average_outstanding_rpcs(); + return a + (rpcs_b.ok() ? *rpcs_b : 0); + }) / + static_cast(channels_.size()); + std::cout << __PRETTY_FUNCTION__ + << ": channels_.size()=" << channels_.size() + << "; sizing_policy_.minimum_channel_pool_size=" + << sizing_policy_.minimum_channel_pool_size + << "; average_rpc_per_channel=" << average_rpc_per_channel + << std::endl; + // TODO(sdhart): do we need to check if we're over max pool size here? + if (average_rpc_per_channel < + sizing_policy_.minimum_average_outstanding_rpcs_per_channel && + channels_.size() > sizing_policy_.minimum_channel_pool_size) { + std::cout << __PRETTY_FUNCTION__ << ": remove channel" << std::endl; + auto random_channel = std::uniform_int_distribution( + 0, channels_.size() - 1)(rng_); + std::swap(channels_[random_channel], channels_.back()); + draining_channels_.push_back(std::move(channels_.back())); + channels_.pop_back(); + ScheduleRemoveChannels(lk); + SetSizeDecreaseCooldownTimer(lk); + } + // TODO(sdhart): do we need to check if we're under min pool size here? + if (average_rpc_per_channel > + sizing_policy_.maximum_average_outstanding_rpcs_per_channel && + channels_.size() < sizing_policy_.maximum_channel_pool_size) { + std::cout << __PRETTY_FUNCTION__ << ": add channel" << std::endl; + // Channel/stub creation is expensive, instead of making the current RPC + // wait on this, use an existing channel right now, and schedule a channel + // to be added. + ScheduleAddChannels(lk); + SetSizeIncreaseCooldownTimer(lk); + } + } + + mutable std::mutex mu_; + std::string instance_name_; + CompletionQueue cq_; + google::cloud::internal::DefaultPRNG rng_; + std::shared_ptr refresh_state_; + StubFactoryFn stub_factory_fn_; + std::vector>> channels_; + DynamicChannelPoolSizingPolicy sizing_policy_; + std::vector>> draining_channels_; + future remove_channel_poll_timer_; + future> + pool_size_increase_cooldown_timer_; + future> + pool_size_decrease_cooldown_timer_; + std::uint32_t next_channel_id_; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc new file mode 100644 index 0000000000000..d684f3d95d8be --- /dev/null +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -0,0 +1,353 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/dynamic_channel_pool.h" +#include "google/cloud/bigtable/testing/mock_bigtable_stub.h" +#include "google/cloud/internal/make_status.h" +#include "google/cloud/testing_util/fake_clock.h" +#include "google/cloud/testing_util/fake_completion_queue_impl.h" +#include "google/cloud/testing_util/mock_completion_queue_impl.h" +#include "google/cloud/testing_util/status_matchers.h" +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::google::cloud::bigtable::testing::MockBigtableStub; +using ::google::cloud::testing_util::FakeCompletionQueueImpl; +using ::google::cloud::testing_util::FakeSteadyClock; +// using ::google::cloud::testing_util::IsOkAndHolds; +using ::google::cloud::testing_util::MockCompletionQueueImpl; +// using ::google::cloud::testing_util::StatusIs; +using ::testing::Eq; +using ::testing::IsEmpty; +using ::testing::MockFunction; + +class DynamicChannelPoolTest : public ::testing::Test { + public: + DynamicChannelPoolTest() + : fake_cq_impl_(std::make_shared()), + mock_cq_impl_(std::make_shared()), + fake_clock_impl_(std::make_shared()), + thread_([this] { cq_.Run(); }) {} + + ~DynamicChannelPoolTest() override { + cq_.Shutdown(); + thread_.join(); + } + + protected: + std::shared_ptr fake_cq_impl_; + std::shared_ptr mock_cq_impl_; + std::shared_ptr fake_clock_impl_; + CompletionQueue cq_; + std::thread thread_; +}; + +TEST_F(DynamicChannelPoolTest, SelectLeastUsedFromTwoChannels) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + auto stub_factory_fn = [&](int, std::string const&, StubManager::Priming) { + return std::make_shared>( + std::make_shared(), fake_clock_impl_, 20); + }; + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(*mock_stub_1, CheckAndMutateRow).Times(0); + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), fake_clock_impl_, initial_rpc_count++)); + channels.push_back(std::make_shared>( + std::move(mock_stub_1), fake_clock_impl_, initial_rpc_count)); + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 2; + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn, sizing_policy); + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, OneInitialChannel) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool after creation. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + { // Pool created with 1 channel. + auto mock_stub = std::make_shared(); + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + EXPECT_CALL(stub_factory_fn, Call).Times(0); + // .WillOnce(::testing::Return( + // std::make_shared>(mock_stub))); + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), fake_clock_impl_, initial_rpc_count)); + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer( + sizing_policy.pool_size_decrease_cooldown_interval); + }); + + sizing_policy.maximum_channel_pool_size = 1; + sizing_policy.minimum_channel_pool_size = 1; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + EXPECT_THAT(pool->size(), Eq(1)); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + } + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, EmptyInitialPool) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool after creation. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + { // Pool created with 0 channels. + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + EXPECT_CALL(stub_factory_fn, Call) + .Times(1) + .WillOnce(::testing::Return( + std::make_shared>(mock_stub))); + + std::vector>> channels; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 1; + sizing_policy.minimum_channel_pool_size = 0; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + + EXPECT_THAT(*pool, ::testing::IsEmpty()); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + + EXPECT_THAT(pool->size(), Eq(1)); + } + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, ShrinkPool) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + auto stub_factory_fn = [&](int, std::string const&, StubManager::Priming) { + return std::make_shared>( + std::make_shared(), fake_clock_impl_, 20); + }; + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), fake_clock_impl_, initial_rpc_count++)); + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(*mock_stub_1, CheckAndMutateRow).Times(0); + channels.push_back(std::make_shared>( + std::move(mock_stub_1), fake_clock_impl_, initial_rpc_count)); + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }) + // Getting a channel from the pool after the initial cooldown timer has + // expired should remove a channel from selection consideration and + // trigger decreasing the underutilized pool. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::nanoseconds(0)); + }) + // And the decrease cooldown timer should be set again. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 2; + sizing_policy.minimum_channel_pool_size = 1; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn, sizing_policy); + EXPECT_THAT(pool->size(), Eq(2)); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + auto draining_channels_size_one = + [](std::vector>>& c) { + EXPECT_THAT(c.size(), Eq(1)); + std::cout << "draining rpcs=" << *c.front()->instant_outstanding_rpcs() + << std::endl; + c.front()->ReleaseStub(); + }; + pool->InstrumentDrainingChannels(draining_channels_size_one); + + fake_clock_impl_->AdvanceTime(sizing_policy.remove_channel_polling_interval + + std::chrono::seconds(1)); + + auto draining_channels_empty = + [](std::vector>>& c) { + EXPECT_THAT(c, IsEmpty()); + }; + + // Execute this check on the CompletionQueue thread to maintain sequencing + // with execution of RemoveChannels. + cq_.RunAsync([pool, fn = std::move(draining_channels_empty)]() { + pool->InstrumentDrainingChannels(fn); + }); + + fake_cq_impl_->SimulateCompletion(false); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google From a2302f59f49782fb174d19d931d5ee11e2a0bcd8 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Wed, 11 Mar 2026 15:38:39 -0400 Subject: [PATCH 2/3] add testing wrapper --- .../bigtable/internal/dynamic_channel_pool.h | 10 +- .../internal/dynamic_channel_pool_test.cc | 99 ++++++++++++++++++- 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index 4e7de773cbfba..265e970d84cb8 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -151,13 +151,19 @@ class DynamicChannelPool // This method is for use in testing only. // // This method acquires a lock on the mutex member variable and executes fn. - void InstrumentDrainingChannels( + void InstrumentDrainingChannelsLocking( std::function>>&)> fn) { std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl; std::scoped_lock lk(mu_); fn(draining_channels_); } + void InstrumentDrainingChannelsNonLocking( + std::function>>&)> fn) { + std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl; + fn(draining_channels_); + } + // If the pool is not under a pool resize cooldown, call // CheckPoolChannelHealth. // @@ -255,6 +261,8 @@ class DynamicChannelPool } private: + friend class DynamicChannelPoolTestWrapper; + DynamicChannelPool( std::string const& instance_name, CompletionQueue cq, std::vector>> initial_wrapped_channels, diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index d684f3d95d8be..faa6e9087d6cb 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -26,6 +26,57 @@ namespace google { namespace cloud { namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +class DynamicChannelPoolTestWrapper { + public: + explicit DynamicChannelPoolTestWrapper( + std::shared_ptr> pool) + : pool_(std::move(pool)) {} + + std::shared_ptr> HandleBadChannels( + std::scoped_lock const& lk, + DynamicChannelPool::ChannelSelectionData& d) { + return pool_->HandleBadChannels(lk, d); + } + + void ScheduleAddChannels(std::scoped_lock const& lk) { + pool_->ScheduleRemoveChannels(lk); + } + + void AddChannels(std::vector const& new_channel_ids) { + pool_->AddChannels(new_channel_ids); + } + + void ScheduleRemoveChannels(std::scoped_lock const& lk) { + pool_->ScheduleRemoveChannels(lk); + } + + void RemoveChannels() { pool_->RemoveChannels(); } + + void EvictBadChannels( + std::scoped_lock const& lk, + std::vector>>::iterator>& + bad_channel_iters) { + pool_->EvictBadChannels(lk, bad_channel_iters); + } + + void SetSizeIncreaseCooldownTimer(std::scoped_lock const& lk) { + pool_->SetSizeIncreaseCooldownTimer(lk); + } + + void SetSizeDecreaseCooldownTimer(std::scoped_lock const& lk) { + pool_->SetSizeDecreaseCooldownTimer(lk); + } + + void CheckPoolChannelHealth(std::scoped_lock const& lk) { + pool_->CheckPoolChannelHealth(lk); + } + + protected: + std::shared_ptr> pool_; +}; + namespace { using ::google::cloud::bigtable::testing::MockBigtableStub; @@ -245,6 +296,7 @@ TEST_F(DynamicChannelPoolTest, EmptyInitialPool) { fake_cq_impl_->SimulateCompletion(false); } +#if 0 TEST_F(DynamicChannelPoolTest, ShrinkPool) { auto instance_name = bigtable::InstanceResource(Project("my-project"), "my-instance") @@ -327,7 +379,7 @@ TEST_F(DynamicChannelPoolTest, ShrinkPool) { << std::endl; c.front()->ReleaseStub(); }; - pool->InstrumentDrainingChannels(draining_channels_size_one); + pool->InstrumentDrainingChannelsNonLocking(draining_channels_size_one); fake_clock_impl_->AdvanceTime(sizing_policy.remove_channel_polling_interval + std::chrono::seconds(1)); @@ -340,11 +392,54 @@ TEST_F(DynamicChannelPoolTest, ShrinkPool) { // Execute this check on the CompletionQueue thread to maintain sequencing // with execution of RemoveChannels. cq_.RunAsync([pool, fn = std::move(draining_channels_empty)]() { - pool->InstrumentDrainingChannels(fn); + pool->InstrumentDrainingChannelsNonLocking(fn); }); fake_cq_impl_->SimulateCompletion(false); } +#endif + +TEST_F(DynamicChannelPoolTest, AddChannels) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + auto mock_stub_0 = std::make_shared(); + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(stub_factory_fn, Call) + .WillOnce([&](int id, std::string const& instance, StubManager::Priming priming) { + EXPECT_THAT(id, Eq(0)); + EXPECT_THAT(instance, Eq(instance_name)); + EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); + return std::make_shared>( + mock_stub_0, fake_clock_impl_, 20); + }) + .WillOnce([&](int id, std::string const& instance, StubManager::Priming priming) { + EXPECT_THAT(id, Eq(1)); + EXPECT_THAT(instance, Eq(instance_name)); + EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); + return std::make_shared>( + mock_stub_1, fake_clock_impl_, 20); + }); + + std::vector>> channels; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + std::vector new_channel_ids = {0, 1}; + wrapper.AddChannels(new_channel_ids); + EXPECT_THAT(pool->size(), Eq(2)); + fake_cq_impl_->SimulateCompletion(false); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END From 62f26637a06887185b52a2d4d870c0f13349442f Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Wed, 11 Mar 2026 20:00:52 -0400 Subject: [PATCH 3/3] more tests --- .../bigtable/internal/dynamic_channel_pool.h | 9 +- .../internal/dynamic_channel_pool_test.cc | 502 ++++++++++++++++-- 2 files changed, 466 insertions(+), 45 deletions(-) diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index 265e970d84cb8..c00072ccc2015 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -50,9 +50,12 @@ struct DynamicChannelPoolSizingPolicy { std::chrono::seconds(120); struct DiscreteChannels { + explicit DiscreteChannels(int number = 0) : number(number) {} int number; }; struct PercentageOfPoolSize { + explicit PercentageOfPoolSize(double percentage = 0.0) + : percentage(percentage) {} double percentage; }; absl::variant @@ -368,7 +371,9 @@ class DynamicChannelPool // Determines the number of channels to add and reserves the channel ids to // be used. Lastly, it calls CompletionQueue::RunAsync with a callback that // executes AddChannels with the reserved ids. - void ScheduleAddChannels(std::scoped_lock const&) { + void ScheduleAddChannels( + std::scoped_lock const&, + std::function const&)> const& test_fn = nullptr) { std::size_t num_channels_to_add; // If we're undersized due to bad channels, get us back to the minimum size. if (channels_.size() < sizing_policy_.minimum_channel_pool_size) { @@ -386,6 +391,8 @@ class DynamicChannelPool new_channel_ids.push_back(next_channel_id_++); } + if (test_fn) test_fn(new_channel_ids); + std::weak_ptr> foo = this->shared_from_this(); cq_.RunAsync([new_channel_ids = std::move(new_channel_ids), weak = std::move(foo)]() { diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index faa6e9087d6cb..14f356bf383c1 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -39,8 +39,10 @@ class DynamicChannelPoolTestWrapper { return pool_->HandleBadChannels(lk, d); } - void ScheduleAddChannels(std::scoped_lock const& lk) { - pool_->ScheduleRemoveChannels(lk); + void ScheduleAddChannels( + std::scoped_lock const& lk, + std::function const&)> test_fn) { + pool_->ScheduleAddChannels(lk, test_fn); } void AddChannels(std::vector const& new_channel_ids) { @@ -73,6 +75,21 @@ class DynamicChannelPoolTestWrapper { pool_->CheckPoolChannelHealth(lk); } + std::scoped_lock CreateLock() { + return std::scoped_lock(pool_->mu_); + } + + void SetRemoveChannelPollTimer(promise& p) { + pool_->remove_channel_poll_timer_ = p.get_future(); + } + + std::vector>> const& + SetDrainingChannels(std::vector>> + draining_channels) { + pool_->draining_channels_ = std::move(draining_channels); + return pool_->draining_channels_; + } + protected: std::shared_ptr> pool_; }; @@ -296,6 +313,445 @@ TEST_F(DynamicChannelPoolTest, EmptyInitialPool) { fake_cq_impl_->SimulateCompletion(false); } +TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolUndersized) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + sizing_policy.minimum_channel_pool_size = 10; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto test_fn = [](std::vector const& new_channel_ids) { + EXPECT_THAT(new_channel_ids, + ::testing::ElementsAreArray({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + }; + { + auto lk = wrapper.CreateLock(); + wrapper.ScheduleAddChannels(lk, test_fn); + } +} + +TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolNearMax) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + DynamicChannelPoolSizingPolicy sizing_policy; + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 3; + sizing_policy.channels_to_add_per_resize = + DynamicChannelPoolSizingPolicy::DiscreteChannels(3); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto test_fn = [](std::vector const& new_channel_ids) { + EXPECT_THAT(new_channel_ids, ::testing::ElementsAreArray({2})); + }; + + { + auto lk = wrapper.CreateLock(); + wrapper.ScheduleAddChannels(lk, test_fn); + } +} + +TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolNotNearMax) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + DynamicChannelPoolSizingPolicy sizing_policy; + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 10; + sizing_policy.channels_to_add_per_resize = + DynamicChannelPoolSizingPolicy::DiscreteChannels(3); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto test_fn = [](std::vector const& new_channel_ids) { + EXPECT_THAT(new_channel_ids, ::testing::ElementsAreArray({2, 3, 4})); + }; + + { + auto lk = wrapper.CreateLock(); + wrapper.ScheduleAddChannels(lk, test_fn); + } +} + +TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolNotNearMaxPercentage) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + DynamicChannelPoolSizingPolicy sizing_policy; + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 10; + sizing_policy.channels_to_add_per_resize = + DynamicChannelPoolSizingPolicy::PercentageOfPoolSize(0.5); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto test_fn = [](std::vector const& new_channel_ids) { + EXPECT_THAT(new_channel_ids, ::testing::ElementsAreArray({2})); + }; + + { + auto lk = wrapper.CreateLock(); + wrapper.ScheduleAddChannels(lk, test_fn); + } +} + +TEST_F(DynamicChannelPoolTest, AddChannels) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + auto mock_stub_0 = std::make_shared(); + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(stub_factory_fn, Call) + .WillOnce([&](int id, std::string const& instance, + StubManager::Priming priming) { + EXPECT_THAT(id, Eq(0)); + EXPECT_THAT(instance, Eq(instance_name)); + EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); + return std::make_shared>( + mock_stub_0, fake_clock_impl_, 20); + }) + .WillOnce([&](int id, std::string const& instance, + StubManager::Priming priming) { + EXPECT_THAT(id, Eq(1)); + EXPECT_THAT(instance, Eq(instance_name)); + EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); + return std::make_shared>( + mock_stub_1, fake_clock_impl_, 20); + }); + + std::vector>> channels; + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + std::vector new_channel_ids = {0, 1}; + wrapper.AddChannels(new_channel_ids); + EXPECT_THAT(pool->size(), Eq(2)); + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, ScheduleRemoveChannelsAlreadyPending) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + promise p; + wrapper.SetRemoveChannelPollTimer(p); + { + auto lk = wrapper.CreateLock(); + wrapper.ScheduleRemoveChannels(lk); + ; + } + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, ScheduleRemoveChannelsNotAlreadyPending) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::nanoseconds(0)); + }); + + { + auto lk = wrapper.CreateLock(); + wrapper.ScheduleRemoveChannels(lk); + ; + } + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, RemoveChannelsLoneChannelDrained) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + std::vector>> draining_channels; + draining_channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + auto const& d = wrapper.SetDrainingChannels(draining_channels); + + // EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // .WillOnce([&](std::chrono::nanoseconds ns) { + // EXPECT_THAT(ns.count(), + // Eq(std::chrono::nanoseconds( + // sizing_policy.remove_channel_polling_interval) + // .count())); + // return cq_.MakeRelativeTimer(std::chrono::nanoseconds(0)); + // }); + + wrapper.RemoveChannels(); + EXPECT_THAT(d, IsEmpty()); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, RemoveChannelsSomeChannelsDrained) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + std::vector>> draining_channels; + draining_channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 1)); + draining_channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + draining_channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 0)); + draining_channels.push_back(std::make_shared>( + std::make_shared(), fake_clock_impl_, 2)); + auto const& d = wrapper.SetDrainingChannels(draining_channels); + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::nanoseconds(0)); + }); + + wrapper.RemoveChannels(); + ASSERT_THAT(d.size(), Eq(2)); + EXPECT_THAT(d[0]->instant_outstanding_rpcs(), testing_util::IsOkAndHolds(2)); + EXPECT_THAT(d[1]->instant_outstanding_rpcs(), testing_util::IsOkAndHolds(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + #if 0 TEST_F(DynamicChannelPoolTest, ShrinkPool) { auto instance_name = @@ -399,48 +855,6 @@ TEST_F(DynamicChannelPoolTest, ShrinkPool) { } #endif -TEST_F(DynamicChannelPoolTest, AddChannels) { - auto instance_name = - bigtable::InstanceResource(Project("my-project"), "my-instance") - .FullName(); - DynamicChannelPoolSizingPolicy sizing_policy; - auto refresh_state = std::make_shared( - fake_cq_impl_, std::chrono::milliseconds(1), - std::chrono::milliseconds(10)); - - MockFunction>>( - std::uint32_t, std::string const&, StubManager::Priming)> - stub_factory_fn; - auto mock_stub_0 = std::make_shared(); - auto mock_stub_1 = std::make_shared(); - EXPECT_CALL(stub_factory_fn, Call) - .WillOnce([&](int id, std::string const& instance, StubManager::Priming priming) { - EXPECT_THAT(id, Eq(0)); - EXPECT_THAT(instance, Eq(instance_name)); - EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); - return std::make_shared>( - mock_stub_0, fake_clock_impl_, 20); - }) - .WillOnce([&](int id, std::string const& instance, StubManager::Priming priming) { - EXPECT_THAT(id, Eq(1)); - EXPECT_THAT(instance, Eq(instance_name)); - EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); - return std::make_shared>( - mock_stub_1, fake_clock_impl_, 20); - }); - - std::vector>> channels; - auto pool = DynamicChannelPool::Create( - instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, - stub_factory_fn.AsStdFunction(), sizing_policy); - DynamicChannelPoolTestWrapper wrapper(pool); - std::vector new_channel_ids = {0, 1}; - wrapper.AddChannels(new_channel_ids); - EXPECT_THAT(pool->size(), Eq(2)); - fake_cq_impl_->SimulateCompletion(false); -} - - } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal