Skip to content

Commit ff2b097

Browse files
markdrothcopybara-github
authored andcommitted
[xDS] fix TSAN failure in callback-based LRS impl (grpc#39806)
This fixes a bug introduced in grpc#39736 whereby the LRS server may call `StartWrite()` after cancellation has already called `Finish()`, thus leading to a TSAN failure. This is arguably a deficiency in the callback API. The write should definitely fail, but it shouldn't cause a TSAN problem. But we can consider that as part of a separate effort. Example stack trace of the TSAN failure: ``` WARNING: ThreadSanitizer: data race (pid=17) Read of size 1 at 0x72680002dd80 by thread T5: #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:742:18 (xds_cluster_end2end_test@poller=epoll1+0x32896b2) #1 grpc::ServerBidiReactor::Finish(grpc::Status) /proc/self/cwd/include/grpcpp/support/server_callback.h:414:13 (xds_cluster_end2end_test@poller=epoll1+0x328dbb4) #2 grpc::testing::LrsServiceImpl::Reactor::OnCancel() /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:494:3 (xds_cluster_end2end_test@poller=epoll1+0x32c6ec1) #3 grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0::operator()() const /proc/self/cwd/src/cpp/server/server_callback.cc:38:14 (xds_cluster_end2end_test@poller=epoll1+0x3b4755b) #4 void std::__invoke_impl(std::__invoke_other, grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3b474d5) #5 std::__invoke_result::type std::__invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47485) #6 std::invoke_result::type std::invoke(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x3b47435) #7 void absl::lts_20250127::internal_any_invocable::InvokeR(grpc::internal::ServerCallbackCall::CallOnCancel(grpc::internal::ServerReactor*)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x3b473d5) #8 void absl::lts_20250127::internal_any_invocable::LocalInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:310:10 (xds_cluster_end2end_test@poller=epoll1+0x3b472f2) #9 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef) #10 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad) #11 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663) #12 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47) #13 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049) #14 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9) #15 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0) #16 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819) Previous write of size 1 at 0x72680002dd80 by thread T4: #0 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::Write(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:790:38 (xds_cluster_end2end_test@poller=epoll1+0x3289e85) #1 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*, grpc::WriteOptions) /proc/self/cwd/include/grpcpp/support/server_callback.h:350:13 (xds_cluster_end2end_test@poller=epoll1+0x32ef139) #2 grpc::ServerBidiReactor::StartWrite(envoy::service::load_stats::v3::LoadStatsResponse const*) /proc/self/cwd/include/grpcpp/support/server_callback.h:328:5 (xds_cluster_end2end_test@poller=epoll1+0x32cb6df) #3 grpc::testing::LrsServiceImpl::Reactor::OnReadDone(bool) /proc/self/cwd/test/cpp/end2end/xds/xds_server.cc:461:5 (xds_cluster_end2end_test@poller=epoll1+0x32c6535) #4 grpc::internal::CallbackBidiHandler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)::operator()(bool) const /proc/self/cwd/include/grpcpp/impl/server_callback_handlers.h:839:22 (xds_cluster_end2end_test@poller=epoll1+0x328f196) #5 std::_Function_handler::ServerCallbackReaderWriterImpl::SetupReactor(grpc::ServerBidiReactor*)::'lambda0'(bool)>::_M_invoke(std::_Any_data const&, bool&&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (xds_cluster_end2end_test@poller=epoll1+0x328ef1a) #6 std::function::operator()(bool) const /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (xds_cluster_end2end_test@poller=epoll1+0x3279bb8) #7 grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'()::operator()() const /proc/self/cwd/include/grpcpp/support/callback_common.h:230:11 (xds_cluster_end2end_test@poller=epoll1+0x3279ae5) #8 void std::__invoke_impl(std::__invoke_other, grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a65) #9 std::__invoke_result::type std::__invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x3279a15) #10 std::invoke_result::type std::invoke(grpc::internal::CallbackWithSuccessTag::Run(bool)::'lambda'() const&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x32799c5) #11 void absl::lts_20250127::functional_internal::InvokeObject(absl::lts_20250127::functional_internal::VoidPtr) /proc/self/cwd/external/com_google_absl/absl/functional/internal/function_ref.h:78:7 (xds_cluster_end2end_test@poller=epoll1+0x327996d) #12 absl::lts_20250127::FunctionRef::operator()() const /proc/self/cwd/external/com_google_absl/absl/functional/function_ref.h:132:12 (xds_cluster_end2end_test@poller=epoll1+0x3bd0221) #13 void grpc::GlobalCallbackHook::CatchingCallback&>(absl::lts_20250127::FunctionRef&) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:41:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd01a1) #14 grpc::DefaultGlobalCallbackHook::RunCallback(grpc_call*, absl::lts_20250127::FunctionRef) /proc/self/cwd/include/grpcpp/support/global_callback_hook.h:50:5 (xds_cluster_end2end_test@poller=epoll1+0x3bd011d) #15 grpc::internal::CallbackWithSuccessTag::Run(bool) /proc/self/cwd/include/grpcpp/support/callback_common.h:227:32 (xds_cluster_end2end_test@poller=epoll1+0x3279715) #16 grpc::internal::CallbackWithSuccessTag::StaticRun(grpc_completion_queue_functor*, int) /proc/self/cwd/include/grpcpp/support/callback_common.h:212:47 (xds_cluster_end2end_test@poller=epoll1+0x3279329) #17 cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0::operator()() const /proc/self/cwd/src/core/lib/surface/completion_queue.cc:913:9 (xds_cluster_end2end_test@poller=epoll1+0x545e2e3) #18 void std::__invoke_impl(std::__invoke_other, cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (xds_cluster_end2end_test@poller=epoll1+0x545e205) #19 std::__invoke_result::type std::__invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (xds_cluster_end2end_test@poller=epoll1+0x545e1b5) #20 std::invoke_result::type std::invoke(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /usr/lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:81:14 (xds_cluster_end2end_test@poller=epoll1+0x545e165) grpc#21 void absl::lts_20250127::internal_any_invocable::InvokeR(cq_end_op_for_callback(grpc_completion_queue*, void*, absl::lts_20250127::Status, void (*)(void*, grpc_cq_completion*), void*, grpc_cq_completion*, bool)::$_0&) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:132:3 (xds_cluster_end2end_test@poller=epoll1+0x545e115) grpc#22 void absl::lts_20250127::internal_any_invocable::RemoteInvoker(absl::lts_20250127::internal_any_invocable::TypeErasedState*) /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:368:10 (xds_cluster_end2end_test@poller=epoll1+0x545df6d) grpc#23 absl::lts_20250127::internal_any_invocable::Impl::operator()() /proc/self/cwd/external/com_google_absl/absl/functional/internal/any_invocable.h:868:1 (xds_cluster_end2end_test@poller=epoll1+0x4aa14ef) grpc#24 grpc_event_engine::experimental::SelfDeletingClosure::Run() /proc/self/cwd/./src/core/lib/event_engine/common_closures.h:54:5 (xds_cluster_end2end_test@poller=epoll1+0x56a78ad) grpc#25 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::Step() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:552:14 (xds_cluster_end2end_test@poller=epoll1+0x56a4663) grpc#26 grpc_event_engine::experimental::WorkStealingThreadPool::ThreadState::ThreadBody() /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:515:10 (xds_cluster_end2end_test@poller=epoll1+0x56a3d47) grpc#27 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::operator()(void*) const /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:277:17 (xds_cluster_end2end_test@poller=epoll1+0x56a5049) grpc#28 grpc_event_engine::experimental::WorkStealingThreadPool::WorkStealingThreadPoolImpl::StartThread()::$_0::__invoke(void*) /proc/self/cwd/src/core/lib/event_engine/thread_pool/work_stealing_thread_pool.cc:275:7 (xds_cluster_end2end_test@poller=epoll1+0x56a4fe9) grpc#29 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::operator()(void*) const /proc/self/cwd/src/core/util/posix/thd.cc:145:11 (xds_cluster_end2end_test@poller=epoll1+0x58479f0) grpc#30 grpc_core::(anonymous namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void (*)(void*), void*, bool*, grpc_core::Thread::Options const&)::'lambda'(void*)::__invoke(void*) /proc/self/cwd/src/core/util/posix/thd.cc:115:9 (xds_cluster_end2end_test@poller=epoll1+0x5847819) ``` Closes grpc#39806 COPYBARA_INTEGRATE_REVIEW=grpc#39806 from markdroth:lrs_server_callback_fix e10ed85 PiperOrigin-RevId: 769289887
1 parent 94ed010 commit ff2b097

2 files changed

Lines changed: 9 additions & 2 deletions

File tree

test/cpp/end2end/xds/xds_server.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,10 @@ LrsServiceImpl::Reactor::Reactor(
438438

439439
void LrsServiceImpl::Reactor::OnReadDone(bool ok) {
440440
if (!ok) return;
441-
if (!seen_first_request_.exchange(true)) {
441+
grpc_core::MutexLock lock(&mu_);
442+
if (finished_) return;
443+
if (!seen_first_request_) {
444+
seen_first_request_ = true;
442445
// Handle initial request.
443446
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
444447
<< this << ": read initial request: " << request_.DebugString();
@@ -491,6 +494,8 @@ void LrsServiceImpl::Reactor::OnDone() {
491494
void LrsServiceImpl::Reactor::OnCancel() {
492495
LOG(INFO) << "LRS[" << lrs_service_impl_->debug_label_ << "]: reactor "
493496
<< this << ": OnCancel()";
497+
grpc_core::MutexLock lock(&mu_);
498+
finished_ = true;
494499
Finish(Status::OK);
495500
}
496501

test/cpp/end2end/xds/xds_server.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,9 @@ class LrsServiceImpl
472472

473473
std::shared_ptr<LrsServiceImpl> lrs_service_impl_;
474474

475-
std::atomic<bool> seen_first_request_{false};
475+
grpc_core::Mutex mu_;
476+
bool seen_first_request_ ABSL_GUARDED_BY(&mu_) = false;
477+
bool finished_ ABSL_GUARDED_BY(&mu_) = false;
476478

477479
LoadStatsRequest request_;
478480
LoadStatsResponse response_;

0 commit comments

Comments
 (0)