Skip to content

Commit bbc2bbf

Browse files
committed
NMS-12987: report currentTimeMillis - flow.lastSwitched as kafka_drift (watermarks are also based on lastSwitched)
1 parent ff85cb5 commit bbc2bbf

1 file changed

Lines changed: 2 additions & 1 deletion

File tree

main/src/main/java/org/opennms/nephron/Pipeline.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ public static class ReadFromKafka extends PTransform<PBegin, PCollection<FlowDoc
340340
private final String topic;
341341
private final Map<String, Object> kafkaConsumerConfig;
342342

343+
// metric name: flink_taskmanager_job_task_operator_flows_from_kafka_drift
343344
private final Counter flowsFromKafka = Metrics.counter("flows", "from_kafka");
344345
// a distribution would be more interesting for from_kafka_drift
345346
// -> Unfortunately histograms are not supported Beam/Flink/Prometheus
@@ -379,7 +380,7 @@ public void processElement(ProcessContext c) {
379380

380381
// Metrics
381382
flowsFromKafka.inc();
382-
flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getTimestamp());
383+
flowsFromKafkaDrift.set(System.currentTimeMillis() - flow.getLastSwitched().getValue());
383384
}
384385
}));
385386
}

0 commit comments

Comments
 (0)