[BAEL-6569] implement the Kafka commit failure handling#19239
[BAEL-6569] implement the Kafka commit failure handling#19239saikatcse03 wants to merge 10 commits into
Conversation
| consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-async-consumer-app"); | ||
| consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
| consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); | ||
| consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); |
There was a problem hiding this comment.
This should also be earliest otherwise the test can go green for the wrong reason - with a brand-new group + latest, any records produced before the consumer's first partition assignment are skipped. If all 100 are skipped, the consumer processes nothing, throws nothing, and the test goes green without ever exercising the offset-tracking/commit code that this whole section is about.
There was a problem hiding this comment.
changed it to earliest.
| countDownLatch.await(30, TimeUnit.SECONDS); | ||
| assertThat(uncaughtException.get()).doesNotThrowAnyException(); |
There was a problem hiding this comment.
Since this only counts down on an uncaught exception (which never occurs) it will wait the full 30s
The assertion effectively says "no exceptions happened in 30 seconds" which is a weak negative assertion
A stronger positive assertion would be to await a real condition (e.g. committed offset == produced count perhaps?)
There was a problem hiding this comment.
Yes, i changed that negative assertion with actual commit offset assertion with awaitability in all similar methods.
| countDownLatch.await(30, TimeUnit.SECONDS); | ||
| assertThat(uncaughtException.get()).doesNotThrowAnyException(); |
There was a problem hiding this comment.
doesNotThrowAnyException() is not designed to be used in this context, it's designed for use when using assertThatCode(..).doesNotThrowAnyException
It happens to work here (green when null, red otherwise) but it's not idiomatic AssertJ usage
I'd replace with assertThat(uncaughtException.get()).isNull() to be clearer
| Properties consumerProperties = new Properties(); | ||
| consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); | ||
| consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "seq-consumer-app"); | ||
| consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1"); |
There was a problem hiding this comment.
Can we minimise the properties set here to the essential ones to reproduce the issue - e.g. ConsumerConfig.GROUP_INSTANCE_ID_CONFIG is not relevant I think, maybe others too
Let's keep consistent config across the tests and only vary them where something is genuinely different for a specific reason
There was a problem hiding this comment.
Yes, removed above irrelevant property. only this one is to be removed and need to keep all others as mandatory.
| try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) { | ||
| for (int num = 0; num < 300; num++) { | ||
| producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num)); | ||
| producer.flush(); |
There was a problem hiding this comment.
This results in 300 flushes, isn't one at the end of the loop sufficient?
There was a problem hiding this comment.
Both approach works for this test, i wanted to quickly send msgs to consumer instead of waiting for producer to processing as batches, flush sends message immediately. though test works for both scenario.
| try { | ||
| commitOffsets(partitions); | ||
| } catch (Exception ex) { | ||
| log.error("Commit failed during rebalance", ex); |
There was a problem hiding this comment.
Could this mask a CommitFailedException being caught?
There was a problem hiding this comment.
no, in this place CommitFailedException has to only log the failure and no need to rethrow it as that might fail consumer process for other partitions. Anyways the failed committed offset will be polled by other consumer in the group, and no other action required here. Even committing offset during rebalance is not always required and is only best effort.
| if (toCommit.isEmpty()) { | ||
| return; | ||
| } | ||
| consumer.commitSync(toCommit); |
There was a problem hiding this comment.
Can't this throw a CommitFailedException? Both the timeout and deadline are set to 1 second so there is no safety margin
We don't see it at the moment because the workload itself finishes in well under 1 second
There was a problem hiding this comment.
Yes, i did not realize this timeout is same with as with the test poll timeout. reduced it. Ideally, in production code, i would read the config timeout and then derive it here with some buffer.
No description provided.