Skip to content

Commit 063f7e8

Browse files
[metric] Support influxdb reporter
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
1 parent 21bcabd commit 063f7e8

12 files changed

Lines changed: 958 additions & 0 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,41 @@ public class ConfigOptions {
19391939
+ "the CoordinatorServer) it is advisable to use a port range "
19401940
+ "like 9990-9999.");
19411941

1942+
// ------------------------------------------------------------------------
1943+
// ConfigOptions for influxdb reporter
1944+
// ------------------------------------------------------------------------
1945+
public static final ConfigOption<String> METRICS_REPORTER_INFLUXDB_HOST_URL =
1946+
key("metrics.reporter.influxdb.host-url")
1947+
.stringType()
1948+
.noDefaultValue()
1949+
.withDescription(
1950+
"The InfluxDB server host URL including scheme, host name, and port.");
1951+
1952+
public static final ConfigOption<String> METRICS_REPORTER_INFLUXDB_BUCKET =
1953+
key("metrics.reporter.influxdb.bucket")
1954+
.stringType()
1955+
.noDefaultValue()
1956+
.withFallbackKeys("metrics.reporter.influxdb.database")
1957+
.withDescription("The InfluxDB bucket/database name.");
1958+
1959+
public static final ConfigOption<String> METRICS_REPORTER_INFLUXDB_ORG =
1960+
key("metrics.reporter.influxdb.org")
1961+
.stringType()
1962+
.noDefaultValue()
1963+
.withDescription("The InfluxDB organization name.");
1964+
1965+
public static final ConfigOption<String> METRICS_REPORTER_INFLUXDB_TOKEN =
1966+
key("metrics.reporter.influxdb.token")
1967+
.stringType()
1968+
.noDefaultValue()
1969+
.withDescription("The InfluxDB authentication token.");
1970+
1971+
public static final ConfigOption<Duration> METRICS_REPORTER_INFLUXDB_PUSH_INTERVAL =
1972+
key("metrics.reporter.influxdb.push-interval")
1973+
.durationType()
1974+
.defaultValue(Duration.ofSeconds(10))
1975+
.withDescription("The interval of reporting metrics to InfluxDB.");
1976+
19421977
// ------------------------------------------------------------------------
19431978
// ConfigOptions for lakehouse storage
19441979
// ------------------------------------------------------------------------
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one
4+
or more contributor license agreements. See the NOTICE file
5+
distributed with this work for additional information
6+
regarding copyright ownership. The ASF licenses this file
7+
to you under the Apache License, Version 2.0 (the
8+
"License"); you may not use this file except in compliance
9+
with the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
<parent>
24+
<groupId>org.apache.fluss</groupId>
25+
<artifactId>fluss-metrics</artifactId>
26+
<version>0.10-SNAPSHOT</version>
27+
</parent>
28+
29+
<artifactId>fluss-metrics-influxdb</artifactId>
30+
<name>Fluss : Metrics : InfluxDB</name>
31+
32+
<properties>
33+
<influxdb3.version>1.8.0</influxdb3.version>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.fluss</groupId>
39+
<artifactId>fluss-common</artifactId>
40+
<version>${project.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
44+
<dependency>
45+
<groupId>com.influxdb</groupId>
46+
<artifactId>influxdb3-java</artifactId>
47+
<version>${influxdb3.version}</version>
48+
<scope>provided</scope>
49+
</dependency>
50+
51+
<!-- test dependencies -->
52+
<dependency>
53+
<groupId>org.apache.fluss</groupId>
54+
<artifactId>fluss-test-utils</artifactId>
55+
</dependency>
56+
<dependency>
57+
<groupId>org.apache.fluss</groupId>
58+
<artifactId>fluss-common</artifactId>
59+
<version>${project.version}</version>
60+
<scope>test</scope>
61+
<type>test-jar</type>
62+
</dependency>
63+
</dependencies>
64+
65+
<build>
66+
<plugins>
67+
<plugin>
68+
<groupId>org.apache.maven.plugins</groupId>
69+
<artifactId>maven-compiler-plugin</artifactId>
70+
<configuration>
71+
<!-- compilation of main sources -->
72+
<skipMain>${skip.on.java8}</skipMain>
73+
<!-- compilation of test sources -->
74+
<skip>${skip.on.java8}</skip>
75+
</configuration>
76+
</plugin>
77+
</plugins>
78+
</build>
79+
</project>
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.metrics.influxdb;
20+
21+
import org.apache.fluss.metrics.Counter;
22+
import org.apache.fluss.metrics.Gauge;
23+
import org.apache.fluss.metrics.Histogram;
24+
import org.apache.fluss.metrics.HistogramStatistics;
25+
import org.apache.fluss.metrics.Meter;
26+
import org.apache.fluss.metrics.Metric;
27+
28+
import com.influxdb.v3.client.Point;
29+
30+
import java.time.Instant;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
/** Producer that creates InfluxDB {@link Point Points} from Fluss {@link Metric Metrics}. */
35+
public class InfluxdbPointProducer {
36+
37+
private static final InfluxdbPointProducer INSTANCE = new InfluxdbPointProducer();
38+
39+
public static InfluxdbPointProducer getInstance() {
40+
return INSTANCE;
41+
}
42+
43+
public Point createPoint(
44+
Metric metric, String metricName, List<Map.Entry<String, String>> tags, Instant time) {
45+
Point point = Point.measurement(metricName).setTimestamp(time);
46+
47+
for (Map.Entry<String, String> tag : tags) {
48+
point.setTag(tag.getKey(), tag.getValue());
49+
}
50+
51+
if (metric instanceof Counter) {
52+
return createPointForCounter((Counter) metric, point);
53+
}
54+
55+
if (metric instanceof Gauge) {
56+
return createPointForGauge((Gauge<?>) metric, point);
57+
}
58+
59+
if (metric instanceof Meter) {
60+
return createPointForMeter((Meter) metric, point);
61+
}
62+
63+
if (metric instanceof Histogram) {
64+
return createPointForHistogram((Histogram) metric, point);
65+
}
66+
67+
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
68+
}
69+
70+
private Point createPointForCounter(Counter counter, Point point) {
71+
return point.setField("count", counter.getCount());
72+
}
73+
74+
private Point createPointForGauge(Gauge<?> gauge, Point point) {
75+
Object value = gauge.getValue();
76+
77+
if (value instanceof Number) {
78+
return point.setField("value", ((Number) value));
79+
} else if (value instanceof Boolean) {
80+
return point.setField("value", ((boolean) value));
81+
} else {
82+
return point.setField("value", String.valueOf(value));
83+
}
84+
}
85+
86+
private Point createPointForMeter(Meter meter, Point point) {
87+
return point.setField("rate", meter.getRate()).setField("count", meter.getCount());
88+
}
89+
90+
private Point createPointForHistogram(Histogram histogram, Point point) {
91+
HistogramStatistics stats = histogram.getStatistics();
92+
return point.setField("count", histogram.getCount())
93+
.setField("mean", stats.getMean())
94+
.setField("stddev", stats.getStdDev())
95+
.setField("min", stats.getMin())
96+
.setField("max", stats.getMax())
97+
.setField("p50", stats.getQuantile(0.5))
98+
.setField("p75", stats.getQuantile(0.75))
99+
.setField("p95", stats.getQuantile(0.95))
100+
.setField("p98", stats.getQuantile(0.98))
101+
.setField("p99", stats.getQuantile(0.99))
102+
.setField("p999", stats.getQuantile(0.999));
103+
}
104+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.metrics.influxdb;
20+
21+
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.metrics.Metric;
23+
import org.apache.fluss.metrics.groups.MetricGroup;
24+
import org.apache.fluss.metrics.reporter.ScheduledMetricReporter;
25+
import org.apache.fluss.utils.MapUtils;
26+
27+
import com.influxdb.v3.client.InfluxDBClient;
28+
import com.influxdb.v3.client.Point;
29+
import com.influxdb.v3.client.config.ClientConfig;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.time.Duration;
34+
import java.time.Instant;
35+
import java.util.ArrayList;
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
40+
/** {@link ScheduledMetricReporter} that exports {@link Metric Metrics} via InfluxDB. */
41+
public class InfluxdbReporter implements ScheduledMetricReporter {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(InfluxdbReporter.class);
44+
45+
private static final char SCOPE_SEPARATOR = '_';
46+
private static final String SCOPE_PREFIX = "fluss" + SCOPE_SEPARATOR;
47+
48+
private final Map<Metric, String> metricNames;
49+
private final Map<Metric, List<Map.Entry<String, String>>> metricTags;
50+
private final InfluxDBClient client;
51+
private final Duration pushInterval;
52+
private final InfluxdbPointProducer pointProducer;
53+
54+
public InfluxdbReporter(
55+
String hostUrl, String org, String bucket, String token, Duration pushInterval) {
56+
ClientConfig clientConfig =
57+
new ClientConfig.Builder()
58+
.host(hostUrl)
59+
.token(token.toCharArray())
60+
.organization(org)
61+
.database(bucket)
62+
.build();
63+
64+
this.client = InfluxDBClient.getInstance(clientConfig);
65+
this.pushInterval = pushInterval;
66+
this.metricNames = MapUtils.newConcurrentHashMap();
67+
this.metricTags = MapUtils.newConcurrentHashMap();
68+
this.pointProducer = InfluxdbPointProducer.getInstance();
69+
70+
LOG.info("Started InfluxDB reporter connecting to {}", hostUrl);
71+
}
72+
73+
@Override
74+
public void open(Configuration config) {
75+
// do nothing
76+
}
77+
78+
@Override
79+
public void close() {
80+
if (client != null) {
81+
try {
82+
client.close();
83+
} catch (Exception e) {
84+
LOG.warn("Failed to close InfluxDB client", e);
85+
}
86+
}
87+
}
88+
89+
@Override
90+
public void report() {
91+
List<Point> points = new ArrayList<>();
92+
Instant now = Instant.now();
93+
94+
for (Map.Entry<Metric, String> entry : metricNames.entrySet()) {
95+
Metric metric = entry.getKey();
96+
String metricName = entry.getValue();
97+
List<Map.Entry<String, String>> tags = metricTags.get(metric);
98+
99+
try {
100+
Point point = pointProducer.createPoint(metric, metricName, tags, now);
101+
points.add(point);
102+
} catch (Exception e) {
103+
LOG.warn("Failed to create point for metric {}", metricName, e);
104+
}
105+
}
106+
107+
client.writePoints(points);
108+
}
109+
110+
@Override
111+
public Duration scheduleInterval() {
112+
return pushInterval;
113+
}
114+
115+
@Override
116+
public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
117+
String scopedMetricName = getScopedName(metricName, group);
118+
List<Map.Entry<String, String>> tags = getTags(group);
119+
120+
metricNames.put(metric, scopedMetricName);
121+
metricTags.put(metric, tags);
122+
}
123+
124+
@Override
125+
public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
126+
metricNames.remove(metric);
127+
metricTags.remove(metric);
128+
}
129+
130+
private String getScopedName(String metricName, MetricGroup group) {
131+
return SCOPE_PREFIX
132+
+ group.getLogicalScope(this::filterCharacters, SCOPE_SEPARATOR)
133+
+ SCOPE_SEPARATOR
134+
+ filterCharacters(metricName);
135+
}
136+
137+
private List<Map.Entry<String, String>> getTags(MetricGroup group) {
138+
List<Map.Entry<String, String>> tags = new ArrayList<>();
139+
for (Map.Entry<String, String> entry : group.getAllVariables().entrySet()) {
140+
tags.add(
141+
new HashMap.SimpleEntry<>(
142+
filterCharacters(entry.getKey()), filterCharacters(entry.getValue())));
143+
}
144+
return tags;
145+
}
146+
147+
private String filterCharacters(String input) {
148+
return input.replaceAll("[^a-zA-Z0-9_:]", String.valueOf(SCOPE_SEPARATOR));
149+
}
150+
}

0 commit comments

Comments
 (0)