Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- [[#7130]](https://github.com/Azure/azure-sdk-for-cpp/issues/7130) Fixed `RetryOperation::Execute` silently swallowing the final exception when every retry attempt threw, which caused `ProducerClient::Send` to return without delivering the batch and without surfacing the underlying failure. The last exception is now rethrown when retries are exhausted, and `ProducerClient::Send` throws if `Execute` ever reports failure as a defense in depth.

### Other Changes

## 1.0.0-beta.11 (2026-05-14)
Expand Down
41 changes: 24 additions & 17 deletions sdk/eventhubs/azure-messaging-eventhubs/src/producer_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,33 @@ namespace Azure { namespace Messaging { namespace EventHubs {

Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(
m_producerClientOptions.RetryOptions);
retryOp.Execute([&]() -> bool {
auto result = GetSender(eventDataBatch.GetPartitionId()).Send(message, context);
// Defense in depth: RetryOperation::Execute rethrows the last exception when retries
// are exhausted, but if the lambda ever returns false directly the batch must not be
// silently dropped. See issue #7130.
if (!retryOp.Execute([&]() -> bool {
auto result = GetSender(eventDataBatch.GetPartitionId()).Send(message, context);
#if ENABLE_UAMQP
auto sendStatus = std::get<0>(result);
if (sendStatus == Azure::Core::Amqp::_internal::MessageSendStatus::Ok)
{
return true;
}
// Throw an exception about the error we just received.
throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::
CreateEventHubsException(std::get<1>(result));
auto sendStatus = std::get<0>(result);
if (sendStatus == Azure::Core::Amqp::_internal::MessageSendStatus::Ok)
{
return true;
}
// Throw an exception about the error we just received.
throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::
CreateEventHubsException(std::get<1>(result));
#elif ENABLE_RUST_AMQP
if (result)
{
throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::
CreateEventHubsException(result);
}
return true;
if (result)
{
throw Azure::Messaging::EventHubs::_detail::EventHubsExceptionFactory::
CreateEventHubsException(result);
}
return true;
#endif
});
}))
{
throw Azure::Messaging::EventHubs::EventHubsException(
"ProducerClient::Send failed after exhausting all retry attempts.");
Comment on lines +130 to +131
}
}

void ProducerClient::Send(Models::EventData const& eventData, Core::Context const& context)
Expand Down
10 changes: 10 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/src/retry_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ bool Azure::Messaging::EventHubs::_detail::RetryOperation::Execute(std::function
using Azure::Core::Diagnostics::Logger;
using Azure::Core::Diagnostics::_internal::Log;
int retryCount = 0;
// Capture the most recent exception so a retries-exhausted fallthrough can
// rethrow it instead of silently returning false. See issue #7130.
std::exception_ptr lastException;
while (retryCount < m_retryOptions.MaxRetries)
Comment on lines 31 to 35
{
std::chrono::milliseconds retryAfter{};
Expand All @@ -39,6 +42,7 @@ bool Azure::Messaging::EventHubs::_detail::RetryOperation::Execute(std::function

if (ShouldRetry(result, retryCount, retryAfter))
{
lastException = nullptr;
retryCount++;
std::this_thread::sleep_for(retryAfter);
}
Expand All @@ -56,6 +60,7 @@ bool Azure::Messaging::EventHubs::_detail::RetryOperation::Execute(std::function
}
if (ShouldRetry(IsFatalException(e), retryCount, retryAfter))
{
lastException = std::current_exception();
retryCount++;
std::this_thread::sleep_for(retryAfter);
}
Expand All @@ -73,6 +78,7 @@ bool Azure::Messaging::EventHubs::_detail::RetryOperation::Execute(std::function
// We assume that all exceptions other than EventHubs exceptions might be retriable.
if (ShouldRetry(false, retryCount, retryAfter))
{
lastException = std::current_exception();
retryCount++;
std::this_thread::sleep_for(retryAfter);
}
Expand All @@ -82,6 +88,10 @@ bool Azure::Messaging::EventHubs::_detail::RetryOperation::Execute(std::function
}
}
}
if (lastException)
{
std::rethrow_exception(lastException);
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ namespace LocalTest {
bool testFunc() { return true; }
bool testNegative() { return false; }
Azure::Core::Http::Policies::RetryOptions retryOptions;

// Fast retry options keep regression tests for issue #7130 quick; the production
// defaults (3 retries, 800ms base delay) would add several seconds of backoff per test.
Azure::Core::Http::Policies::RetryOptions MakeFastRetryOptions(int32_t maxRetries = 3)
{
Azure::Core::Http::Policies::RetryOptions opts;
opts.MaxRetries = maxRetries;
opts.RetryDelay = std::chrono::milliseconds(1);
opts.MaxRetryDelay = std::chrono::milliseconds(2);
return opts;
}
} // namespace LocalTest

namespace Azure { namespace Messaging { namespace EventHubs { namespace _internal { namespace Test {
Expand Down Expand Up @@ -62,4 +73,112 @@ namespace Azure { namespace Messaging { namespace EventHubs { namespace _interna
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(LocalTest::retryOptions);
EXPECT_FALSE(retryOp.ShouldRetry(false, LocalTest::retryOptions.MaxRetries, retryAfter));
}

// Regression tests for issue #7130: RetryOperation::Execute must rethrow the last
// exception when all retry attempts have been exhausted; previously it returned false
// and silently dropped the failure.
TEST_F(RetryOperationTest, RethrowsLastEventHubsExceptionWhenRetriesExhausted)
{
auto opts = LocalTest::MakeFastRetryOptions(3);
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(opts);

int callCount = 0;
auto alwaysThrows = [&callCount]() -> bool {
++callCount;
throw Azure::Messaging::EventHubs::EventHubsException(
"transient failure attempt " + std::to_string(callCount));
};

try
{
retryOp.Execute(alwaysThrows);
FAIL() << "Expected EventHubsException to be rethrown after retries were exhausted.";
}
catch (Azure::Messaging::EventHubs::EventHubsException const& e)
{
EXPECT_STREQ("transient failure attempt 3", e.what());
}
EXPECT_EQ(3, callCount);
}

TEST_F(RetryOperationTest, RethrowsLastStdExceptionWhenRetriesExhausted)
{
auto opts = LocalTest::MakeFastRetryOptions(2);
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(opts);

int callCount = 0;
auto alwaysThrows = [&callCount]() -> bool {
++callCount;
throw std::runtime_error("network blip " + std::to_string(callCount));
};

try
{
retryOp.Execute(alwaysThrows);
FAIL() << "Expected std::runtime_error to be rethrown after retries were exhausted.";
}
catch (std::runtime_error const& e)
{
EXPECT_STREQ("network blip 2", e.what());
}
EXPECT_EQ(2, callCount);
}

TEST_F(RetryOperationTest, ThrowsImmediatelyOnFatalEventHubsException)
{
auto opts = LocalTest::MakeFastRetryOptions(5);
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(opts);

int callCount = 0;
auto throwsFatal = [&callCount]() -> bool {
++callCount;
Azure::Messaging::EventHubs::EventHubsException ex("message too big");
ex.ErrorCondition = "amqp:link:message-size-exceeded";
throw ex;
};

EXPECT_THROW(retryOp.Execute(throwsFatal), Azure::Messaging::EventHubs::EventHubsException);
EXPECT_EQ(1, callCount) << "Fatal exception must not be retried.";
}

TEST_F(RetryOperationTest, SucceedsAfterTransientException)
{
auto opts = LocalTest::MakeFastRetryOptions(3);
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(opts);

int callCount = 0;
auto eventuallySucceeds = [&callCount]() -> bool {
++callCount;
if (callCount == 1)
{
throw Azure::Messaging::EventHubs::EventHubsException("first attempt fails");
}
return true;
};

EXPECT_TRUE(retryOp.Execute(eventuallySucceeds));
EXPECT_EQ(2, callCount);
}

TEST_F(RetryOperationTest, FalseAfterTransientExceptionDoesNotRethrow)
{
auto opts = LocalTest::MakeFastRetryOptions(3);
Azure::Messaging::EventHubs::_detail::RetryOperation retryOp(opts);

int callCount = 0;
auto throwsThenReturnsFalse = [&callCount]() -> bool {
++callCount;
if (callCount == 1)
{
throw Azure::Messaging::EventHubs::EventHubsException("first attempt fails");
}
return false;
};

// Second attempt returns false cleanly. ShouldRetry(false=response, retryCount=1)
// returns true, so the loop retries; on attempt 3 the loop terminates and Execute
// returns false. The exception from attempt 1 must not be rethrown.
EXPECT_NO_THROW({ EXPECT_FALSE(retryOp.Execute(throwsThenReturnsFalse)); });
EXPECT_EQ(opts.MaxRetries, callCount);
}
Comment on lines +165 to +183
}}}}} // namespace Azure::Messaging::EventHubs::_internal::Test
Loading