Skip to content

Commit 621573c

Browse files
committed
[lake/tiering] Add metrics for lake table tiering
1 parent 4f3c457 commit 621573c

12 files changed

Lines changed: 363 additions & 12 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ public class MetricNames {
5353
public static final String KV_ALL_SNAPSHOT_SIZE = "allKvSnapshotSize";
5454
public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE = "remoteKvSize";
5555

56+
// for lake tiering metrics - global level
57+
public static final String LAKE_TIERING_PENDING_TABLES_COUNT = "pendingTablesCount";
58+
public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = "runningTablesCount";
59+
public static final String LAKE_TIERING_FAILURES_TOTAL = "failuresTotal";
60+
61+
// for lake tiering table-level metrics
62+
public static final String LAKE_TIERING_LAST_SUCCESS_AGE_MS = "lastSuccessAgeMs";
63+
public static final String LAKE_TIERING_LAST_DURATION_MS = "lastDurationMs";
64+
public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal";
65+
public static final String LAKE_TIERING_TABLE_STATE = "state";
66+
5667
// --------------------------------------------------------------------------------------------
5768
// metrics for tablet server
5869
// --------------------------------------------------------------------------------------------

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,19 @@
2222
import org.apache.fluss.lake.committer.CommitterInitContext;
2323
import org.apache.fluss.lake.committer.LakeCommitter;
2424
import org.apache.fluss.metadata.TablePath;
25+
import org.apache.fluss.utils.types.Tuple2;
2526

2627
import org.apache.paimon.CoreOptions;
28+
import org.apache.paimon.FileStore;
2729
import org.apache.paimon.Snapshot;
2830
import org.apache.paimon.catalog.Catalog;
31+
import org.apache.paimon.manifest.FileKind;
2932
import org.apache.paimon.manifest.IndexManifestEntry;
3033
import org.apache.paimon.manifest.ManifestCommittable;
3134
import org.apache.paimon.manifest.ManifestEntry;
35+
import org.apache.paimon.manifest.ManifestFile;
36+
import org.apache.paimon.manifest.ManifestFileMeta;
37+
import org.apache.paimon.manifest.ManifestList;
3238
import org.apache.paimon.manifest.SimpleFileEntry;
3339
import org.apache.paimon.table.FileStoreTable;
3440
import org.apache.paimon.table.sink.CommitCallback;
@@ -92,6 +98,14 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
9298
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
9399
tableCommit.commit(manifestCommittable);
94100

101+
Snapshot snapshot = fileStoreTable.snapshotManager().latestSnapshot();
102+
if (snapshot != null) {
103+
Tuple2<Integer, Long> info = manifestListInfo(fileStoreTable.store(), snapshot);
104+
System.out.printf(
105+
"Committed snapshot %d with %d files and %d bytes.%n",
106+
snapshot.id(), info.f0, info.f1);
107+
}
108+
95109
Long commitSnapshotId = currentCommitSnapshotId.get();
96110
currentCommitSnapshotId.remove();
97111

@@ -105,6 +119,27 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
105119
}
106120
}
107121

122+
private Tuple2<Integer, Long> manifestListInfo(FileStore<?> store, Snapshot snapshot) {
123+
ManifestList manifestList = store.manifestListFactory().create();
124+
ManifestFile manifestFile = store.manifestFileFactory().create();
125+
List<ManifestFileMeta> manifestFileMetas = manifestList.readDataManifests(snapshot);
126+
int fileCount = 0;
127+
long fileSize = 0;
128+
for (ManifestFileMeta manifestFileMeta : manifestFileMetas) {
129+
List<ManifestEntry> manifestEntries = manifestFile.read(manifestFileMeta.fileName());
130+
for (ManifestEntry entry : manifestEntries) {
131+
if (entry.kind() == FileKind.ADD) {
132+
fileSize += entry.file().fileSize();
133+
fileCount++;
134+
} else {
135+
fileSize -= entry.file().fileSize();
136+
fileCount--;
137+
}
138+
}
139+
}
140+
return Tuple2.of(fileCount, fileSize);
141+
}
142+
108143
@Override
109144
public void abort(PaimonCommittable committable) throws IOException {
110145
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.fluss.server.metadata.ServerMetadataCache;
3939
import org.apache.fluss.server.metrics.ServerMetricUtils;
4040
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
41+
import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
4142
import org.apache.fluss.server.zk.ZooKeeperClient;
4243
import org.apache.fluss.server.zk.ZooKeeperUtils;
4344
import org.apache.fluss.server.zk.data.CoordinatorAddress;
@@ -188,7 +189,9 @@ protected void startServices() throws Exception {
188189
authorizer.startup();
189190
}
190191

191-
this.lakeTableTieringManager = new LakeTableTieringManager();
192+
this.lakeTableTieringManager =
193+
new LakeTableTieringManager(
194+
new LakeTieringMetricGroup(metricRegistry, serverMetricGroup));
192195

193196
MetadataManager metadataManager =
194197
new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
import org.apache.fluss.exception.TableNotExistException;
2424
import org.apache.fluss.metadata.TableInfo;
2525
import org.apache.fluss.metadata.TablePath;
26+
import org.apache.fluss.metrics.MetricNames;
27+
import org.apache.fluss.metrics.groups.MetricGroup;
2628
import org.apache.fluss.server.entity.LakeTieringTableInfo;
29+
import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
2730
import org.apache.fluss.server.utils.timer.DefaultTimer;
2831
import org.apache.fluss.server.utils.timer.Timer;
2932
import org.apache.fluss.server.utils.timer.TimerTask;
@@ -126,25 +129,38 @@ public class LakeTableTieringManager implements AutoCloseable {
126129
// table_id -> the last timestamp of tiered lake snapshot
127130
private final Map<Long, Long> tableLastTieredTime;
128131

132+
// table_id -> the last tiering duration in milliseconds
133+
private final Map<Long, Long> tableLastTieringDuration;
134+
135+
// table_id -> the tiering failure count
136+
private final Map<Long, Long> tableTieringFailureCount;
137+
129138
// the live tables that are tiering,
130139
// from table_id -> last heartbeat time by the tiering service
131140
private final Map<Long, Long> liveTieringTableIds;
132141

142+
// global tiering failure count
143+
private long globalTieringFailureCount = 0;
144+
133145
private final Lock lock = new ReentrantLock();
134146

135-
public LakeTableTieringManager() {
147+
private final LakeTieringMetricGroup lakeTieringMetricGroup;
148+
149+
public LakeTableTieringManager(LakeTieringMetricGroup lakeTieringMetricGroup) {
136150
this(
137151
new DefaultTimer("delay lake tiering", 1_000, 20),
138152
Executors.newSingleThreadScheduledExecutor(
139153
new ExecutorThreadFactory("fluss-lake-tiering-timeout-checker")),
140-
SystemClock.getInstance());
154+
SystemClock.getInstance(),
155+
lakeTieringMetricGroup);
141156
}
142157

143158
@VisibleForTesting
144159
protected LakeTableTieringManager(
145160
Timer lakeTieringScheduleTimer,
146161
ScheduledExecutorService lakeTieringServiceTimeoutChecker,
147-
Clock clock) {
162+
Clock clock,
163+
LakeTieringMetricGroup lakeTieringMetricGroup) {
148164
this.lakeTieringScheduleTimer = lakeTieringScheduleTimer;
149165
this.lakeTieringServiceTimeoutChecker = lakeTieringServiceTimeoutChecker;
150166
this.clock = clock;
@@ -159,6 +175,19 @@ protected LakeTableTieringManager(
159175
this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS);
160176
this.tableTierEpoch = new HashMap<>();
161177
this.tableLastTieredTime = new HashMap<>();
178+
this.tableLastTieringDuration = new HashMap<>();
179+
this.tableTieringFailureCount = new HashMap<>();
180+
this.lakeTieringMetricGroup = lakeTieringMetricGroup;
181+
registerMetrics();
182+
}
183+
184+
private void registerMetrics() {
185+
lakeTieringMetricGroup.gauge(
186+
MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT, pendingTieringTables::size);
187+
lakeTieringMetricGroup.gauge(
188+
MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT, liveTieringTableIds::size);
189+
lakeTieringMetricGroup.gauge(
190+
MetricNames.LAKE_TIERING_FAILURES_TOTAL, () -> globalTieringFailureCount);
162191
}
163192

164193
public void initWithLakeTables(List<Tuple2<TableInfo, Long>> tableInfoWithTieredTime) {
@@ -196,6 +225,11 @@ private void registerLakeTable(TableInfo tableInfo, long lastTieredTime) {
196225
tableId, tableInfo.getTableConfig().getDataLakeFreshness().toMillis());
197226
tableLastTieredTime.put(tableId, lastTieredTime);
198227
tableTierEpoch.put(tableId, 0L);
228+
tableLastTieringDuration.put(tableId, 0L);
229+
tableTieringFailureCount.put(tableId, 0L);
230+
231+
// register table-level metrics
232+
registerTableMetrics(tableId, tableInfo.getTablePath());
199233
}
200234

201235
private void scheduleTableTiering(long tableId) {
@@ -211,13 +245,50 @@ private void scheduleTableTiering(long tableId) {
211245
lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs));
212246
}
213247

248+
private void registerTableMetrics(long tableId, TablePath tablePath) {
249+
// create table-level metric group
250+
MetricGroup tableMetricGroup =
251+
lakeTieringMetricGroup.addGroup(
252+
"table", tablePath.getDatabaseName() + "." + tablePath.getTableName());
253+
254+
// lastSuccessAgeMs: milliseconds since last successful tiering
255+
tableMetricGroup.gauge(
256+
MetricNames.LAKE_TIERING_LAST_SUCCESS_AGE_MS,
257+
() -> {
258+
Long lastTiered = tableLastTieredTime.get(tableId);
259+
return lastTiered != null ? clock.milliseconds() - lastTiered : -1L;
260+
});
261+
262+
// lastDurationMs: duration of last tiering job
263+
tableMetricGroup.gauge(
264+
MetricNames.LAKE_TIERING_LAST_DURATION_MS,
265+
() -> tableLastTieringDuration.getOrDefault(tableId, 0L));
266+
267+
// failuresTotal: total failure count for this table
268+
tableMetricGroup.gauge(
269+
MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL,
270+
() -> tableTieringFailureCount.getOrDefault(tableId, 0L));
271+
272+
// state: current tiering state represented as ordinal value
273+
// (0=New, 1=Initialized, 2=Scheduled, 3=Pending, 4=Tiering, 5=Tiered, 6=Failed, -1=table
274+
// not exist)
275+
tableMetricGroup.gauge(
276+
MetricNames.LAKE_TIERING_TABLE_STATE,
277+
() -> {
278+
TieringState state = tieringStates.get(tableId);
279+
return state != null ? state.ordinal() : -1;
280+
});
281+
}
282+
214283
public void removeLakeTable(long tableId) {
215284
inLock(
216285
lock,
217286
() -> {
218287
tablePaths.remove(tableId);
219288
tableLakeFreshness.remove(tableId);
220289
tableLastTieredTime.remove(tableId);
290+
tableLastTieringDuration.remove(tableId);
291+
tableTieringFailureCount.remove(tableId);
221292
tieringStates.remove(tableId);
222293
liveTieringTableIds.remove(tableId);
223294
tableTierEpoch.remove(tableId);
@@ -392,9 +463,17 @@ private void doHandleStateChange(long tableId, TieringState targetState) {
392463
break;
393464
case Tiered:
394465
tableLastTieredTime.put(tableId, clock.milliseconds());
395-
liveTieringTableIds.remove(tableId);
466+
// calculate and record tiering duration
467+
Long startTime = liveTieringTableIds.remove(tableId);
468+
if (startTime != null) {
469+
long duration = clock.milliseconds() - startTime;
470+
tableLastTieringDuration.put(tableId, duration);
471+
}
396472
break;
397473
case Failed:
474+
// increment failure counters
475+
tableTieringFailureCount.computeIfPresent(tableId, (t, v) -> v + 1);
476+
globalTieringFailureCount++;
398477
liveTieringTableIds.remove(tableId);
399478
// do nothing
400479
break;
@@ -480,7 +559,8 @@ private void advanceClock() throws InterruptedException {
480559
}
481560
}
482561

483-
private enum TieringState {
562+
@VisibleForTesting
563+
enum TieringState {
484564
// When a new lake table is created, the state will be New
485565
New {
486566
@Override
@@ -537,4 +617,43 @@ public Set<TieringState> validPreviousStates() {
537617

538618
abstract Set<TieringState> validPreviousStates();
539619
}
620+
621+
// ------------------------------------------------------------------------
622+
// Test-only methods
623+
// ------------------------------------------------------------------------
624+
625+
@VisibleForTesting
626+
protected int getPendingTablesCount() {
627+
return pendingTieringTables.size();
628+
}
629+
630+
@VisibleForTesting
631+
protected int getRunningTablesCount() {
632+
return liveTieringTableIds.size();
633+
}
634+
635+
@VisibleForTesting
636+
protected long getGlobalFailureCount() {
637+
return globalTieringFailureCount;
638+
}
639+
640+
@VisibleForTesting
641+
protected Long getTableLastSuccessTime(long tableId) {
642+
return tableLastTieredTime.get(tableId);
643+
}
644+
645+
@VisibleForTesting
646+
protected Long getTableLastDuration(long tableId) {
647+
return tableLastTieringDuration.get(tableId);
648+
}
649+
650+
@VisibleForTesting
651+
protected Long getTableFailureCount(long tableId) {
652+
return tableTieringFailureCount.get(tableId);
653+
}
654+
655+
@VisibleForTesting
656+
protected TieringState getTableState(long tableId) {
657+
return tieringStates.get(tableId);
658+
}
540659
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.server.metrics.group;
20+
21+
import org.apache.fluss.metrics.CharacterFilter;
22+
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
23+
import org.apache.fluss.metrics.registry.MetricRegistry;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
29+
30+
/** Metrics for lake tiering. */
31+
public class LakeTieringMetricGroup extends AbstractMetricGroup {
32+
33+
private static final Logger LOG = LoggerFactory.getLogger(LakeTieringMetricGroup.class);
34+
35+
public LakeTieringMetricGroup(MetricRegistry registry, CoordinatorMetricGroup parent) {
36+
super(registry, makeScope(parent), parent);
37+
}
38+
39+
@Override
40+
protected String getGroupName(CharacterFilter filter) {
41+
return "lakeTiering";
42+
}
43+
}

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ private void registerLakeTieringMetrics() {
535535
MetricNames.LOG_LAKE_PENDING_RECORDS,
536536
() ->
537537
getLakeLogEndOffset() < 0L
538-
? -1
538+
? getLogHighWatermark() - getLogStartOffset()
539539
: getLogHighWatermark() - getLakeLogEndOffset());
540540
lakeTieringMetricGroup.gauge(
541541
MetricNames.LOG_LAKE_TIMESTAMP_LAG,

fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ void beforeEach() throws IOException {
190190
testCoordinatorChannelManager = new TestCoordinatorChannelManager();
191191
autoPartitionManager =
192192
new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration());
193-
lakeTableTieringManager = new LakeTableTieringManager();
193+
lakeTableTieringManager =
194+
new LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
194195
Configuration conf = new Configuration();
195196
conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
196197
eventProcessor = buildCoordinatorEventProcessor();

0 commit comments

Comments
 (0)