Skip to content

Commit 5367030

Browse files
committed
IGNITE-27573 Adapt jraft node metrics to ignite metric framework
1 parent 81b6836 commit 5367030

37 files changed

Lines changed: 1218 additions & 558 deletions

.idea/codeStyles/Project.xml

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import static org.mockito.Mockito.times;
5757
import static org.mockito.Mockito.verify;
5858

59-
import com.codahale.metrics.ConsoleReporter;
6059
import com.lmax.disruptor.EventHandler;
6160
import com.lmax.disruptor.RingBuffer;
6261
import java.io.File;
@@ -101,6 +100,8 @@
101100
import org.apache.ignite.internal.logger.IgniteLogger;
102101
import org.apache.ignite.internal.logger.Loggers;
103102
import org.apache.ignite.internal.manager.ComponentContext;
103+
import org.apache.ignite.internal.metrics.TestMetricManager;
104+
import org.apache.ignite.internal.metrics.sources.FsmCallerMetricSource;
104105
import org.apache.ignite.internal.network.ClusterService;
105106
import org.apache.ignite.internal.network.StaticNodeFinder;
106107
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
@@ -338,7 +339,8 @@ public void testSmallestBufferSize() throws Exception {
338339
1,
339340
false,
340341
false,
341-
null
342+
new TestMetricManager(),
343+
FsmCallerMetricSource.SOURCE_NAME
342344
) {
343345
@Override
344346
public RingBuffer<ApplyTask> subscribe(
@@ -1667,40 +1669,6 @@ public void run(Status status, long index, byte[] reqCtx) {
16671669
assertEquals(10000, fsm.getLogs().size());
16681670
}
16691671

1670-
@Test
1671-
public void testNodeMetrics() throws Exception {
1672-
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
1673-
1674-
cluster = new TestCluster("unittest", dataPath, peers, testInfo);
1675-
for (TestPeer peer : peers)
1676-
assertTrue(cluster.start(peer, false, 300, true));
1677-
1678-
//elect and get leader
1679-
Node leader = cluster.waitAndGetLeader();
1680-
assertNotNull(leader);
1681-
assertEquals(3, leader.listPeers().size());
1682-
// apply tasks to leader
1683-
sendTestTaskAndWait(leader);
1684-
1685-
{
1686-
ByteBuffer data = ByteBuffer.wrap("no closure".getBytes(UTF_8));
1687-
Task task = new Task(data, null);
1688-
leader.apply(task);
1689-
}
1690-
1691-
cluster.ensureSame();
1692-
for (Node node : cluster.getNodes()) {
1693-
System.out.println("-------------" + node.getNodeId() + "-------------");
1694-
ConsoleReporter reporter = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry())
1695-
.build();
1696-
reporter.report();
1697-
reporter.close();
1698-
System.out.println();
1699-
}
1700-
// TODO check http status https://issues.apache.org/jira/browse/IGNITE-14832
1701-
assertEquals(2, cluster.getFollowers().size());
1702-
}
1703-
17041672
@Test
17051673
public void testLeaderFail() throws Exception {
17061674
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 3);
@@ -3331,7 +3299,7 @@ public void testBootStrapWithSnapshot() throws Exception {
33313299
opts.setGroupConf(JRaftUtils.getConfiguration(peer.getPeerId().toString()));
33323300
opts.setFsm(fsm);
33333301

3334-
assertTrue(JRaftUtils.bootstrap(opts));
3302+
assertTrue(JRaftUtils.bootstrap(opts, new TestMetricManager()));
33353303
assertThat(logStorageProvider.stopAsync(new ComponentContext()), willCompleteSuccessfully());
33363304

33373305
NodeOptions nodeOpts = new NodeOptions();
@@ -3378,7 +3346,7 @@ public void testBootStrapWithoutSnapshot() throws Exception {
33783346
opts.setGroupConf(JRaftUtils.getConfiguration(peer.getPeerId().toString()));
33793347
opts.setFsm(fsm);
33803348

3381-
assertTrue(JRaftUtils.bootstrap(opts));
3349+
assertTrue(JRaftUtils.bootstrap(opts, new TestMetricManager()));
33823350
assertThat(logStorageProvider.stopAsync(new ComponentContext()), willCompleteSuccessfully());
33833351

33843352
NodeOptions nodeOpts = new NodeOptions();
@@ -4973,7 +4941,7 @@ private RaftGroupService createService(String groupId, TestPeer peer, NodeOption
49734941

49744942
assertThat(clusterService.startAsync(new ComponentContext()), willCompleteSuccessfully());
49754943

4976-
var service = new RaftGroupService(groupId, peer.getPeerId(), nodeOptions, rpcServer) {
4944+
var service = new RaftGroupService(groupId, peer.getPeerId(), nodeOptions, rpcServer, new TestMetricManager()) {
49774945
@Override
49784946
public synchronized void shutdown() {
49794947
rpcServer.shutdown();

modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/TestCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.ignite.internal.logger.IgniteLogger;
5454
import org.apache.ignite.internal.logger.Loggers;
5555
import org.apache.ignite.internal.manager.ComponentContext;
56+
import org.apache.ignite.internal.metrics.TestMetricManager;
5657
import org.apache.ignite.internal.network.ClusterService;
5758
import org.apache.ignite.internal.network.StaticNodeFinder;
5859
import org.apache.ignite.internal.raft.JraftGroupEventsListener;
@@ -217,7 +218,6 @@ public boolean start(TestPeer peer, boolean emptyPeers, int snapshotIntervalSecs
217218
nodeOptions.setServerName(peer.getPeerId().toString());
218219

219220
nodeOptions.setElectionTimeoutMs(this.electionTimeoutMs);
220-
nodeOptions.setEnableMetrics(enableMetrics);
221221
nodeOptions.setSnapshotThrottle(snapshotThrottle);
222222
nodeOptions.setSnapshotIntervalSecs(snapshotIntervalSecs);
223223
nodeOptions.setServiceFactory(this.raftServiceFactories.apply(peer.getPeerId()));
@@ -287,7 +287,7 @@ public boolean start(TestPeer peer, boolean emptyPeers, int snapshotIntervalSecs
287287
optsClo.accept(peer.getPeerId(), nodeOptions);
288288

289289
RaftGroupService server = new RaftGroupService(this.name, peer.getPeerId(),
290-
nodeOptions, rpcServer) {
290+
nodeOptions, rpcServer, new TestMetricManager()) {
291291
@Override public synchronized void shutdown() {
292292
nodeManager.shutdown();
293293
// This stop order is consistent with JRaftServerImpl
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.metrics.sources;
19+
20+
import com.lmax.disruptor.RingBuffer;
21+
import java.util.ArrayList;
22+
import java.util.stream.LongStream;
23+
import org.apache.ignite.internal.metrics.AbstractMetricSource;
24+
import org.apache.ignite.internal.metrics.DistributionMetric;
25+
import org.apache.ignite.internal.metrics.LongGauge;
26+
import org.apache.ignite.internal.metrics.Metric;
27+
28+
/**
29+
* Striped disruptor metrics.
30+
*/
31+
public class DisruptorMetricSource extends AbstractMetricSource<DisruptorMetricSource.Holder> {
32+
private final RingBuffer<?>[] ringBuffers;
33+
34+
/**
35+
* Constructor.
36+
*
37+
* @param sourceName Base source name.
38+
* @param ringBuffers Ring buffers related to the corresponding disruptor.
39+
*/
40+
public DisruptorMetricSource(String sourceName, RingBuffer<?>[] ringBuffers) {
41+
super(sourceName + ".disruptor");
42+
43+
this.ringBuffers = ringBuffers;
44+
}
45+
46+
/**
47+
* Adds a batch size sample.
48+
*
49+
* @param size Batch size.
50+
*/
51+
public void addBatchSize(long size) {
52+
Holder holder = holder();
53+
54+
if (holder != null) {
55+
holder.batchSizeHistogramMetric.add(size);
56+
}
57+
}
58+
59+
/**
60+
* Records a hit to the specified stripe.
61+
*
62+
* @param stripe Stripe index.
63+
*/
64+
public void hitToStripe(int stripe) {
65+
Holder holder = holder();
66+
67+
if (holder != null) {
68+
holder.stripeHistogramMetric.add(stripe);
69+
}
70+
}
71+
72+
@Override
73+
protected Holder createHolder() {
74+
return new Holder();
75+
}
76+
77+
/** Metric holder for disruptor metrics. */
78+
protected class Holder implements AbstractMetricSource.Holder<DisruptorMetricSource.Holder> {
79+
private final DistributionMetric batchSizeHistogramMetric = new DistributionMetric(
80+
"BatchSize",
81+
"The histogram of batch size in disruptor",
82+
new long[] {10L, 20L, 30L, 40L, 50L}
83+
);
84+
85+
private final DistributionMetric stripeHistogramMetric = new DistributionMetric(
86+
"Stripe",
87+
"The histogram of stripes in disruptor",
88+
LongStream.range(0, ringBuffers.length).toArray()
89+
);
90+
91+
private final ArrayList<Metric> metrics;
92+
93+
Holder() {
94+
metrics = new ArrayList<>();
95+
96+
metrics.add(batchSizeHistogramMetric);
97+
metrics.add(stripeHistogramMetric);
98+
99+
for (int i = 0; i < ringBuffers.length; i++) {
100+
RingBuffer<?> ringBuffer = ringBuffers[i];
101+
102+
metrics.add(new LongGauge(
103+
"StripeRemainingCapacity_" + i,
104+
"The remaining capacity of stripe " + i,
105+
ringBuffer::remainingCapacity
106+
));
107+
108+
metrics.add(new LongGauge(
109+
"StripeBufferSize_" + i,
110+
"The buffer size of stripe " + i,
111+
ringBuffer::getBufferSize
112+
));
113+
}
114+
}
115+
116+
@Override
117+
public Iterable<Metric> metrics() {
118+
return metrics;
119+
}
120+
}
121+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.metrics.sources;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.apache.ignite.internal.metrics.AbstractMetricSource;
23+
import org.apache.ignite.internal.metrics.AtomicLongMetric;
24+
import org.apache.ignite.internal.metrics.DistributionMetric;
25+
import org.apache.ignite.internal.metrics.LongGauge;
26+
import org.apache.ignite.internal.metrics.Metric;
27+
import org.apache.ignite.raft.jraft.core.FSMCallerImpl;
28+
import org.apache.ignite.raft.jraft.core.FSMCallerImpl.TaskType;
29+
30+
/**
31+
* Metrics of FSM caller.
32+
*/
33+
public class FsmCallerMetricSource extends AbstractMetricSource<FsmCallerMetricSource.Holder> {
34+
public static final String SOURCE_NAME = "raft.fsmcaller";
35+
36+
/**
37+
* Constructor.
38+
*/
39+
public FsmCallerMetricSource(String groupId) {
40+
super(sourceName(groupId));
41+
}
42+
43+
private static String sourceName(String groupId) {
44+
return SOURCE_NAME + '.' + groupId;
45+
}
46+
47+
@Override
48+
protected Holder createHolder() {
49+
return new Holder();
50+
}
51+
52+
/**
53+
* Called on FSM commit.
54+
*
55+
* @param time Duration of the commit operation.
56+
*/
57+
public void onFsmCommit(long time) {
58+
Holder holder = holder();
59+
60+
if (holder != null) {
61+
holder.lastCommitTime.value(time);
62+
}
63+
}
64+
65+
/**
66+
* Called on applying tasks.
67+
*
68+
* @param time Duration of the apply operation.
69+
* @param size Number of tasks applied.
70+
*/
71+
public void onApplyTasks(long time, long size) {
72+
Holder holder = holder();
73+
74+
if (holder != null) {
75+
holder.lastApplyTasksTime.value(time);
76+
holder.applyTasksSize.add(size);
77+
}
78+
}
79+
80+
/** Metric holder for FSM caller metrics. */
81+
static class Holder implements AbstractMetricSource.Holder<Holder> {
82+
private final DistributionMetric applyTasksSize = new DistributionMetric(
83+
"ApplyTasksSize",
84+
"Sizes of applied tasks batches",
85+
new long[] {10, 20, 30, 40, 50}
86+
);
87+
88+
private final AtomicLongMetric lastApplyTasksTime = new AtomicLongMetric("ApplyTasksTime", "Time to apply tasks");
89+
90+
private final AtomicLongMetric lastCommitTime = new AtomicLongMetric("CommitTime", "Time to apply tasks");
91+
92+
private final List<Metric> metrics;
93+
94+
Holder() {
95+
metrics = new ArrayList<>();
96+
97+
metrics.add(applyTasksSize);
98+
metrics.add(lastApplyTasksTime);
99+
metrics.add(lastCommitTime);
100+
101+
for (TaskType type : FSMCallerImpl.TaskType.values()) {
102+
metrics.add(new LongGauge(type.metricName(), "Time to execute " + type.name() + " task", type::getApplyDuration));
103+
}
104+
}
105+
106+
@Override
107+
public Iterable<Metric> metrics() {
108+
return metrics;
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)