What needs to happen?
In ReadFromKafkaDoFn.java, a Guava Stopwatch is currently used to measure the latency of consumer.poll() and report it to the RpcLatency metric.
As noted in an existing TODO comment in the codebase, this timer uses System.nanoTime(). When a consumer has prefetches waiting to be returned immediately, the overhead of System.nanoTime() can contribute more latency than it actually measures (see nanotrusting-nanotime).
To fix this, we should:
- Remove the
pollTimer (Stopwatch) from the while loop in ReadFromKafkaDoFn.
- Stop manually reporting to
updateSuccessfulRpcMetrics in this hot path, and instead rely on Kafka's native fetch-latency-avg JMX metric which users can already monitor.
- Replace the
remainingTimeout calculation with a lower-overhead System.currentTimeMillis() check to ensure we still respect the consumerPollingTimeout without the nanoTime penalty.
Issue Priority
Priority: 2 (default / most normal work should be filed as P2)
Issue Components
What needs to happen?
In
ReadFromKafkaDoFn.java, a GuavaStopwatchis currently used to measure the latency ofconsumer.poll()and report it to theRpcLatencymetric.As noted in an existing
TODOcomment in the codebase, this timer usesSystem.nanoTime(). When a consumer has prefetches waiting to be returned immediately, the overhead ofSystem.nanoTime()can contribute more latency than it actually measures (see nanotrusting-nanotime).To fix this, we should:
pollTimer(Stopwatch) from thewhileloop inReadFromKafkaDoFn.updateSuccessfulRpcMetricsin this hot path, and instead rely on Kafka's nativefetch-latency-avgJMX metric which users can already monitor.remainingTimeoutcalculation with a lower-overheadSystem.currentTimeMillis()check to ensure we still respect theconsumerPollingTimeoutwithout thenanoTimepenalty.Issue Priority
Priority: 2 (default / most normal work should be filed as P2)
Issue Components