diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index c721bf7e64..41f0a841d5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -11,6 +11,9 @@ ### Bugs Fixed +- [[#6957]](https://github.com/Azure/azure-sdk-for-cpp/issues/6957) Catch and log exceptions thrown from `Close()` inside the `ProducerClient`, `ConsumerClient`, and `PartitionClient` destructors so cleanup failures no longer terminate the program via `std::terminate`. +- `ProducerClient::Close` and `ConsumerClient::Close` now perform best-effort teardown: a throw from one step no longer skips remaining cleanup. The first exception encountered is rethrown to the caller after every step has been attempted. + ### Other Changes ## 1.0.0-beta.10 (2024-11-01) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp index 98daf20a93..61721edf90 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/consumer_client.hpp @@ -106,7 +106,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { /** Move a consumer client */ ConsumerClient& operator=(ConsumerClient&& other) = delete; - ~ConsumerClient(); + ~ConsumerClient() noexcept; /** @brief Getter for event hub name * @@ -158,6 +158,10 @@ namespace Azure { namespace Messaging { namespace EventHubs { * partition clients. * * @param context The context for the operation can be used for request cancellation. + * + * @remark Performs best-effort teardown: if one step throws, the remaining + * steps are still attempted. The first exception encountered is rethrown + * to the caller after all steps have run. */ void Close(Azure::Core::Context const& context); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp index 501bb16815..73f8d463a9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/partition_client.hpp @@ -70,7 +70,7 @@ namespace Azure { namespace Messaging { namespace EventHubs { /** Destroy this partition client. */ - virtual ~PartitionClient(); + virtual ~PartitionClient() noexcept; /** Receive events from the partition. * diff --git a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp index b10d935999..3f5f249e99 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/inc/azure/messaging/eventhubs/producer_client.hpp @@ -98,11 +98,15 @@ namespace Azure { namespace Messaging { namespace EventHubs { /** Default Constructor for a ProducerClient */ ProducerClient() = default; - ~ProducerClient() { Close(); } + ~ProducerClient() noexcept; /** @brief Close all the connections and sessions. * * @param context Context for the operation can be used for request cancellation. + * + * @remark Performs best-effort teardown: if one step throws, the remaining + * steps are still attempted. The first exception encountered is rethrown + * to the caller after all steps have run. */ void Close(Azure::Core::Context const& context = {}); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp index 02fed577c1..5a1ae8d9c0 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp @@ -28,54 +28,125 @@ namespace Azure { namespace Messaging { namespace EventHubs { + _detail::EventHubsConsumerGroupsPath + m_consumerGroup; } - ConsumerClient::~ConsumerClient() + ConsumerClient::~ConsumerClient() noexcept { - Log::Stream(Logger::Level::Informational) << "Destroy consumer client."; + try + { + Log::Stream(Logger::Level::Informational) << "Destroy consumer client."; + } + catch (...) + { + } - Close({}); + try + { + Close({}); + } + catch (std::exception const& e) + { + try + { + Log::Stream(Logger::Level::Warning) + << "~ConsumerClient(): exception thrown during Close(): " << e.what(); + } + catch (...) + { + } + } + catch (...) + { + try + { + Log::Stream(Logger::Level::Warning) + << "~ConsumerClient(): unknown exception thrown during Close()."; + } + catch (...) + { + } + } } void ConsumerClient::Close(Azure::Core::Context const& context) { - Log::Stream(Logger::Level::Verbose) << "Close producer client."; + Log::Stream(Logger::Level::Verbose) << "Close consumer client."; + + // Best-effort teardown: a throw from one step must not skip the rest. The + // first exception encountered is captured and rethrown after every step + // has been attempted. + std::exception_ptr firstException; + auto attemptStep + = [&firstException](auto&& step, char const* description) noexcept { + try + { + step(); + } + catch (std::exception const& e) + { + if (!firstException) + { + firstException = std::current_exception(); + } + try + { + Log::Stream(Logger::Level::Warning) + << "ConsumerClient::Close: " << description << " failed: " << e.what(); + } + catch (...) + { + } + } + catch (...) + { + if (!firstException) + { + firstException = std::current_exception(); + } + try + { + Log::Stream(Logger::Level::Warning) << "ConsumerClient::Close: " << description + << " failed with unknown exception."; + } + catch (...) + { + } + } + }; + { std::unique_lock lock(m_propertiesClientLock); if (m_propertiesClient) { - m_propertiesClient->Close(context); + attemptStep([&] { m_propertiesClient->Close(context); }, "properties client"); m_propertiesClient.reset(); } } - Log::Stream(Logger::Level::Verbose) << "Closing message senders."; - // Tear down the sessions and then the connections, in that order. + Log::Stream(Logger::Level::Verbose) << "Closing message receivers."; for (auto& receiver : m_receivers) { - receiver.second.Close(context); + attemptStep([&] { receiver.second.Close(context); }, "message receiver"); } #if ENABLE_RUST_AMQP Log::Stream(Logger::Level::Verbose) << "Closing sessions."; for (auto& session : m_sessions) { - session.second.End(context); + attemptStep([&] { session.second.End(context); }, "session"); } Log::Stream(Logger::Level::Verbose) << "Closing connections."; for (auto& connection : m_connections) { - connection.second.Close(context); + attemptStep([&] { connection.second.Close(context); }, "connection"); } #endif - while (!m_sessions.empty()) - { - m_sessions.erase(m_sessions.begin()); - } + m_sessions.clear(); + m_connections.clear(); + m_receivers.clear(); - while (!m_connections.empty()) + if (firstException) { - m_connections.erase(m_connections.begin()); - }; - m_receivers.clear(); + std::rethrow_exception(firstException); + } } Azure::Core::Amqp::_internal::Connection ConsumerClient::CreateConnection( diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp index af734ecbdc..4466ec4680 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/partition_client.cpp @@ -208,11 +208,43 @@ namespace Azure { namespace Messaging { namespace EventHubs { { } - PartitionClient::~PartitionClient() + PartitionClient::~PartitionClient() noexcept { - Log::Stream(Logger::Level::Verbose) << "~PartitionClient() " - << "Close Receiver."; - m_receiver.Close(); + try + { + Log::Stream(Logger::Level::Verbose) << "~PartitionClient() " + << "Close Receiver."; + } + catch (...) + { + } + + try + { + m_receiver.Close(); + } + catch (std::exception const& e) + { + try + { + Log::Stream(Logger::Level::Warning) + << "~PartitionClient(): exception thrown during m_receiver.Close(): " << e.what(); + } + catch (...) + { + } + } + catch (...) + { + try + { + Log::Stream(Logger::Level::Warning) + << "~PartitionClient(): unknown exception thrown during m_receiver.Close()."; + } + catch (...) + { + } + } } /** Receive events from the partition. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp b/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp index e14c63088c..06f63dc702 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp @@ -33,21 +33,94 @@ namespace Azure { namespace Messaging { namespace EventHubs { { } + ProducerClient::~ProducerClient() noexcept + { + try + { + Close(); + } + catch (std::exception const& e) + { + try + { + Log::Stream(Logger::Level::Warning) + << "~ProducerClient(): exception thrown during Close(): " << e.what(); + } + catch (...) + { + } + } + catch (...) + { + try + { + Log::Stream(Logger::Level::Warning) + << "~ProducerClient(): unknown exception thrown during Close()."; + } + catch (...) + { + } + } + } + void ProducerClient::Close(Azure::Core::Context const& context) { Log::Stream(Logger::Level::Verbose) << "Close producer client."; + + // Best-effort teardown: a throw from one step must not skip the rest. The + // first exception encountered is captured and rethrown after every step + // has been attempted. + std::exception_ptr firstException; + auto attemptStep + = [&firstException](auto&& step, char const* description) noexcept { + try + { + step(); + } + catch (std::exception const& e) + { + if (!firstException) + { + firstException = std::current_exception(); + } + try + { + Log::Stream(Logger::Level::Warning) + << "ProducerClient::Close: " << description << " failed: " << e.what(); + } + catch (...) + { + } + } + catch (...) + { + if (!firstException) + { + firstException = std::current_exception(); + } + try + { + Log::Stream(Logger::Level::Warning) << "ProducerClient::Close: " << description + << " failed with unknown exception."; + } + catch (...) + { + } + } + }; + { std::unique_lock lock(m_propertiesClientLock); if (m_propertiesClient) { - m_propertiesClient->Close(context); + attemptStep([&] { m_propertiesClient->Close(context); }, "properties client"); m_propertiesClient.reset(); } } Log::Stream(Logger::Level::Verbose) << "Closing message senders."; for (auto& sender : m_senders) { - sender.second.Close(context); + attemptStep([&] { sender.second.Close(context); }, "message sender"); } m_senders.clear(); @@ -55,17 +128,22 @@ namespace Azure { namespace Messaging { namespace EventHubs { Log::Stream(Logger::Level::Verbose) << "Closing sessions."; for (auto& session : m_sessions) { - session.second.End(context); + attemptStep([&] { session.second.End(context); }, "session"); } Log::Stream(Logger::Level::Verbose) << "Closing connections."; for (auto& connection : m_connections) { - connection.second.Close(context); + attemptStep([&] { connection.second.Close(context); }, "connection"); } #endif // Remove all the sessions and connections after they've been closed. m_sessions.clear(); m_connections.clear(); + + if (firstException) + { + std::rethrow_exception(firstException); + } } EventDataBatch ProducerClient::CreateBatch(