Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

### Bugs Fixed

- [[#6957]](https://github.com/Azure/azure-sdk-for-cpp/issues/6957) Catch and log exceptions thrown from `Close()` inside the `ProducerClient`, `ConsumerClient`, and `PartitionClient` destructors so cleanup failures no longer terminate the program via `std::terminate`.
- `ProducerClient::Close` and `ConsumerClient::Close` now perform best-effort teardown: a throw from one step no longer skips remaining cleanup. The first exception encountered is rethrown to the caller after every step has been attempted.

### Other Changes

## 1.0.0-beta.10 (2024-11-01)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/** Move a consumer client */
ConsumerClient& operator=(ConsumerClient&& other) = delete;

~ConsumerClient();
~ConsumerClient() noexcept;

/** @brief Getter for event hub name
*
Expand Down Expand Up @@ -158,6 +158,10 @@ namespace Azure { namespace Messaging { namespace EventHubs {
* partition clients.
*
* @param context The context for the operation can be used for request cancellation.
*
* @remark Performs best-effort teardown: if one step throws, the remaining
* steps are still attempted. The first exception encountered is rethrown
* to the caller after all steps have run.
*/
void Close(Azure::Core::Context const& context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ namespace Azure { namespace Messaging { namespace EventHubs {

/** Destroy this partition client.
*/
virtual ~PartitionClient();
virtual ~PartitionClient() noexcept;

/** Receive events from the partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,15 @@ namespace Azure { namespace Messaging { namespace EventHubs {
/** Default Constructor for a ProducerClient */
ProducerClient() = default;

~ProducerClient() { Close(); }
~ProducerClient() noexcept;

/** @brief Close all the connections and sessions.
*
* @param context Context for the operation can be used for request cancellation.
*
* @remark Performs best-effort teardown: if one step throws, the remaining
* steps are still attempted. The first exception encountered is rethrown
* to the caller after all steps have run.
*/
void Close(Azure::Core::Context const& context = {});

Expand Down
107 changes: 89 additions & 18 deletions sdk/eventhubs/azure-messaging-eventhubs/src/consumer_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,54 +28,125 @@ namespace Azure { namespace Messaging { namespace EventHubs {
+ _detail::EventHubsConsumerGroupsPath + m_consumerGroup;
}

ConsumerClient::~ConsumerClient()
ConsumerClient::~ConsumerClient() noexcept
{
Log::Stream(Logger::Level::Informational) << "Destroy consumer client.";
try
{
Log::Stream(Logger::Level::Informational) << "Destroy consumer client.";
}
catch (...)
{
}

Close({});
try
{
Close({});
}
catch (std::exception const& e)
{
try
{
Log::Stream(Logger::Level::Warning)
<< "~ConsumerClient(): exception thrown during Close(): " << e.what();
}
catch (...)
{
}
}
Comment on lines +45 to +55
catch (...)
{
try
{
Log::Stream(Logger::Level::Warning)
<< "~ConsumerClient(): unknown exception thrown during Close().";
}
catch (...)
{
}
}
}

void ConsumerClient::Close(Azure::Core::Context const& context)
{
Log::Stream(Logger::Level::Verbose) << "Close producer client.";
Log::Stream(Logger::Level::Verbose) << "Close consumer client.";

// Best-effort teardown: a throw from one step must not skip the rest. The
// first exception encountered is captured and rethrown after every step
// has been attempted.
std::exception_ptr firstException;
auto attemptStep
= [&firstException](auto&& step, char const* description) noexcept {
try
{
step();
}
catch (std::exception const& e)
{
if (!firstException)
{
firstException = std::current_exception();
}
try
{
Log::Stream(Logger::Level::Warning)
<< "ConsumerClient::Close: " << description << " failed: " << e.what();
}
catch (...)
{
}
}
catch (...)
{
if (!firstException)
{
firstException = std::current_exception();
}
try
{
Log::Stream(Logger::Level::Warning) << "ConsumerClient::Close: " << description
<< " failed with unknown exception.";
}
catch (...)
{
}
}
};

{
std::unique_lock<std::mutex> lock(m_propertiesClientLock);
if (m_propertiesClient)
{
m_propertiesClient->Close(context);
attemptStep([&] { m_propertiesClient->Close(context); }, "properties client");
m_propertiesClient.reset();
}
}
Log::Stream(Logger::Level::Verbose) << "Closing message senders.";
// Tear down the sessions and then the connections, in that order.
Log::Stream(Logger::Level::Verbose) << "Closing message receivers.";
for (auto& receiver : m_receivers)
{
receiver.second.Close(context);
attemptStep([&] { receiver.second.Close(context); }, "message receiver");
}

#if ENABLE_RUST_AMQP
Log::Stream(Logger::Level::Verbose) << "Closing sessions.";
for (auto& session : m_sessions)
{
session.second.End(context);
attemptStep([&] { session.second.End(context); }, "session");
}
Log::Stream(Logger::Level::Verbose) << "Closing connections.";
for (auto& connection : m_connections)
{
connection.second.Close(context);
attemptStep([&] { connection.second.Close(context); }, "connection");
}
#endif

while (!m_sessions.empty())
{
m_sessions.erase(m_sessions.begin());
}
m_sessions.clear();
m_connections.clear();
m_receivers.clear();

while (!m_connections.empty())
if (firstException)
{
m_connections.erase(m_connections.begin());
};
m_receivers.clear();
std::rethrow_exception(firstException);
}
}

Azure::Core::Amqp::_internal::Connection ConsumerClient::CreateConnection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,43 @@ namespace Azure { namespace Messaging { namespace EventHubs {
{
}

PartitionClient::~PartitionClient()
PartitionClient::~PartitionClient() noexcept
{
Log::Stream(Logger::Level::Verbose) << "~PartitionClient() "
<< "Close Receiver.";
m_receiver.Close();
try
{
Log::Stream(Logger::Level::Verbose) << "~PartitionClient() "
<< "Close Receiver.";
}
catch (...)
{
}

try
{
m_receiver.Close();
}
catch (std::exception const& e)
{
try
{
Log::Stream(Logger::Level::Warning)
<< "~PartitionClient(): exception thrown during m_receiver.Close(): " << e.what();
}
catch (...)
{
}
}
Comment on lines +226 to +236
catch (...)
{
try
{
Log::Stream(Logger::Level::Warning)
<< "~PartitionClient(): unknown exception thrown during m_receiver.Close().";
}
catch (...)
{
}
}
}

/** Receive events from the partition.
Expand Down
86 changes: 82 additions & 4 deletions sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,117 @@ namespace Azure { namespace Messaging { namespace EventHubs {
{
}

ProducerClient::~ProducerClient() noexcept
{
try
{
Close();
}
catch (std::exception const& e)
{
try
{
Log::Stream(Logger::Level::Warning)
<< "~ProducerClient(): exception thrown during Close(): " << e.what();
}
catch (...)
{
}
}
catch (...)
{
try
{
Log::Stream(Logger::Level::Warning)
<< "~ProducerClient(): unknown exception thrown during Close().";
}
catch (...)
{
}
Comment on lines +38 to +62
}
}

void ProducerClient::Close(Azure::Core::Context const& context)
{
Log::Stream(Logger::Level::Verbose) << "Close producer client.";

// Best-effort teardown: a throw from one step must not skip the rest. The
// first exception encountered is captured and rethrown after every step
// has been attempted.
std::exception_ptr firstException;
auto attemptStep
= [&firstException](auto&& step, char const* description) noexcept {
try
{
step();
}
catch (std::exception const& e)
{
if (!firstException)
{
firstException = std::current_exception();
}
try
{
Log::Stream(Logger::Level::Warning)
<< "ProducerClient::Close: " << description << " failed: " << e.what();
}
catch (...)
{
}
}
catch (...)
{
if (!firstException)
{
firstException = std::current_exception();
}
try
{
Log::Stream(Logger::Level::Warning) << "ProducerClient::Close: " << description
<< " failed with unknown exception.";
}
catch (...)
{
}
}
};

{
std::unique_lock<std::mutex> lock(m_propertiesClientLock);
if (m_propertiesClient)
{
m_propertiesClient->Close(context);
attemptStep([&] { m_propertiesClient->Close(context); }, "properties client");
m_propertiesClient.reset();
}
}
Log::Stream(Logger::Level::Verbose) << "Closing message senders.";
for (auto& sender : m_senders)
{
sender.second.Close(context);
attemptStep([&] { sender.second.Close(context); }, "message sender");
}
m_senders.clear();

#if ENABLE_RUST_AMQP
Log::Stream(Logger::Level::Verbose) << "Closing sessions.";
for (auto& session : m_sessions)
{
session.second.End(context);
attemptStep([&] { session.second.End(context); }, "session");
}
Log::Stream(Logger::Level::Verbose) << "Closing connections.";
for (auto& connection : m_connections)
{
connection.second.Close(context);
attemptStep([&] { connection.second.Close(context); }, "connection");
}
#endif
// Remove all the sessions and connections after they've been closed.
m_sessions.clear();
m_connections.clear();

if (firstException)
{
std::rethrow_exception(firstException);
}
}

EventDataBatch ProducerClient::CreateBatch(
Expand Down
Loading