Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closeables;
Expand Down Expand Up @@ -411,7 +410,6 @@ public boolean offsetBasedDeduplicationSupported() {
private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

private KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
private Stopwatch stopwatch = Stopwatch.createUnstarted();

private Set<String> kafkaTopics;

Expand Down Expand Up @@ -580,13 +578,12 @@ private void consumerPollLoop() {
while (!closed.get()) {
try {
if (records.isEmpty()) {
stopwatch.start();
long startMillis = System.currentTimeMillis();
records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
stopwatch.stop();
long elapsedMillis = System.currentTimeMillis() - startMillis;
for (String kafkaTopic : kafkaTopics) {
kafkaResults.updateSuccessfulRpcMetrics(
kafkaTopic,
java.time.Duration.ofMillis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
kafkaTopic, java.time.Duration.ofMillis(elapsedMillis));
}
} else if (availableRecordsQueue.offer(
records, RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -567,19 +566,14 @@ public ProcessContinuation processElement(
long expectedOffset = tracker.currentRestriction().getFrom();
consumer.resume(Collections.singleton(topicPartition));
consumer.seek(topicPartition, expectedOffset);
final Stopwatch pollTimer = Stopwatch.createUnstarted();

final KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
try {
while (Duration.ZERO.compareTo(remainingTimeout) < 0) {
// TODO: Remove this timer and use the existing fetch-latency-avg metric.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO is still valid. It's referring to Kafka metrics: https://kafka.apache.org/20/generated/consumer_metrics.html

Stopwatch and System.currentTimeMillis doesn't make a difference here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification, and I think there was some misunderstanding on my side about the intent of the TODO.

I originally read

“Remove this timer and use the existing fetch‑latency‑avg metric.”

as “the custom Stopwatch‑based timer in the hot consumer.poll() loop is too expensive and should be removed / replaced,” so I focused this PR on changing how we measure the latency (from Stopwatch to System.currentTimeMillis) while keeping the existing Beam metric behavior.

From your comments and the link to the Kafka consumer metrics docs, I now understand that your intent is more about relying on Kafka’s own fetch-latency-avg JMX metric instead of having a separate Beam RpcLatency metric here, rather than just which Java clock is used. In that sense, you’re right: this PR doesn’t actually implement the TODO, it just refactors the timer and doesn’t add much value.

Given that, I’m happy to either:

  • Close this PR as a misaligned optimization attempt, or
  • If you think it’s worthwhile, follow up with a separate change that truly removes the custom latency timer / RpcLatency metric in favor of Kafka’s metrics (with tests and a clearer design discussion up front).

Please let me know which direction you’d prefer; if you’d rather keep the current behavior and leave the TODO for later, I’ll go ahead and close this PR.

// A consumer will often have prefetches waiting to be returned immediately in which case
// this timer may contribute more latency than it measures.
// See https://shipilev.net/blog/2014/nanotrusting-nanotime/ for more information.
pollTimer.reset().start();
long startMillis = System.currentTimeMillis();
// Fetch the next records.
final ConsumerRecords<byte[], byte[]> rawRecords = consumer.poll(remainingTimeout);
final Duration elapsed = pollTimer.elapsed();
final Duration elapsed = Duration.ofMillis(System.currentTimeMillis() - startMillis);
try {
remainingTimeout = remainingTimeout.minus(elapsed);
} catch (ArithmeticException e) {
Expand Down
Loading