Skip to content

[BAEL-6569] implement the Kafka commit failure handling#19239

Open
saikatcse03 wants to merge 10 commits into
eugenp:masterfrom
saikatcse03:BAEL-6569
Open

[BAEL-6569] implement the Kafka commit failure handling#19239
saikatcse03 wants to merge 10 commits into
eugenp:masterfrom
saikatcse03:BAEL-6569

Conversation

@saikatcse03

Copy link
Copy Markdown
Contributor

No description provided.

consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-async-consumer-app");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be earliest otherwise the test can go green for the wrong reason - with a brand-new group + latest, any records produced before the consumer's first partition assignment are skipped. If all 100 are skipped, the consumer processes nothing, throws nothing, and the test goes green without ever exercising the offset-tracking/commit code that this whole section is about.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it to earliest.

Comment on lines +48 to +49
countDownLatch.await(30, TimeUnit.SECONDS);
assertThat(uncaughtException.get()).doesNotThrowAnyException();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this only counts down on an uncaught exception (which never occurs) it will wait the full 30s

The assertion effectively says "no exceptions happened in 30 seconds" which is a weak negative assertion

A stronger positive assertion would be to await a real condition (e.g. committed offset == produced count perhaps?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i changed that negative assertion with actual commit offset assertion with awaitability in all similar methods.

Comment on lines +48 to +49
countDownLatch.await(30, TimeUnit.SECONDS);
assertThat(uncaughtException.get()).doesNotThrowAnyException();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesNotThrowAnyException() is not designed to be used in this context, it's designed for use when using assertThatCode(..).doesNotThrowAnyException

It happens to work here (green when null, red otherwise) but it's not idiomatic AssertJ usage

I'd replace with assertThat(uncaughtException.get()).isNull() to be clearer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, updated.

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "seq-consumer-app");
consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we minimise the properties set here to the essential ones to reproduce the issue - e.g. ConsumerConfig.GROUP_INSTANCE_ID_CONFIG is not relevant I think, maybe others too

Let's keep consistent config across the tests and only vary them where something is genuinely different for a specific reason

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, removed above irrelevant property. only this one is to be removed and need to keep all others as mandatory.

try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
for (int num = 0; num < 300; num++) {
producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num));
producer.flush();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This results in 300 flushes, isn't one at the end of the loop sufficient?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both approach works for this test, i wanted to quickly send msgs to consumer instead of waiting for producer to processing as batches, flush sends message immediately. though test works for both scenario.

try {
commitOffsets(partitions);
} catch (Exception ex) {
log.error("Commit failed during rebalance", ex);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this mask a CommitFailedException being caught?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, in this place CommitFailedException has to only log the failure and no need to rethrow it as that might fail consumer process for other partitions. Anyways the failed committed offset will be polled by other consumer in the group, and no other action required here. Even committing offset during rebalance is not always required and is only best effort.

if (toCommit.isEmpty()) {
return;
}
consumer.commitSync(toCommit);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this throw a CommitFailedException? Both the timeout and deadline are set to 1 second so there is no safety margin

We don't see it at the moment because the workload itself finishes in well under 1 second

@saikatcse03 saikatcse03 Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i did not realize this timeout is same with as with the test poll timeout. reduced it. Ideally, in production code, i would read the config timeout and then derive it here with some buffer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants