diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/metric/SparkMetricGroup.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/metric/SparkMetricGroup.java
new file mode 100644
index 000000000000..4eb30bf054f1
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/metric/SparkMetricGroup.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.metric;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroupImpl;
+
+import com.codahale.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A {@link org.apache.paimon.metrics.MetricGroup} that extends {@link MetricGroupImpl} to
+ * additionally register metrics with a Codahale {@link MetricRegistry} for exposure via Spark's
+ * MetricsSystem (JMX, Prometheus, etc.).
+ *
+ *
This preserves full backward compatibility with the existing Spark UI integration (which reads
+ * from {@link MetricGroupImpl#getMetrics()}) while also making metrics available to external
+ * monitoring systems.
+ */
+public class SparkMetricGroup extends MetricGroupImpl {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SparkMetricGroup.class);
+
+ private final MetricRegistry codahaleRegistry;
+ private final String metricPrefix;
+ private final Set registeredNames;
+
+ public SparkMetricGroup(
+ String groupName, Map variables, MetricRegistry codahaleRegistry) {
+ super(groupName, variables);
+ this.codahaleRegistry = codahaleRegistry;
+ this.metricPrefix = buildPrefix(groupName, variables);
+ this.registeredNames = new HashSet<>();
+ }
+
+ @Override
+ public Counter counter(String name) {
+ Counter paimonCounter = super.counter(name);
+ if (paimonCounter != null) {
+ registerCodahaleGauge(
+ codahaleName(name), (com.codahale.metrics.Gauge) paimonCounter::getCount);
+ }
+ return paimonCounter;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Gauge gauge(String name, Gauge gauge) {
+ Gauge paimonGauge = super.gauge(name, gauge);
+ if (paimonGauge != null) {
+ registerCodahaleGauge(
+ codahaleName(name), (com.codahale.metrics.Gauge) paimonGauge::getValue);
+ }
+ return paimonGauge;
+ }
+
+ @Override
+ public Histogram histogram(String name, int windowSize) {
+ Histogram paimonHistogram = super.histogram(name, windowSize);
+ if (paimonHistogram != null) {
+ String base = codahaleName(name);
+ registerCodahaleGauge(
+ base + ".count", (com.codahale.metrics.Gauge) paimonHistogram::getCount);
+ registerCodahaleGauge(
+ base + ".mean",
+ (com.codahale.metrics.Gauge)
+ () -> paimonHistogram.getStatistics().getMean());
+ registerCodahaleGauge(
+ base + ".min",
+ (com.codahale.metrics.Gauge)
+ () -> paimonHistogram.getStatistics().getMin());
+ registerCodahaleGauge(
+ base + ".max",
+ (com.codahale.metrics.Gauge)
+ () -> paimonHistogram.getStatistics().getMax());
+ registerCodahaleGauge(
+ base + ".p50",
+ (com.codahale.metrics.Gauge)
+ () -> paimonHistogram.getStatistics().getQuantile(0.5));
+ registerCodahaleGauge(
+ base + ".p95",
+ (com.codahale.metrics.Gauge)
+ () -> paimonHistogram.getStatistics().getQuantile(0.95));
+ registerCodahaleGauge(
+ base + ".p99",
+ (com.codahale.metrics.Gauge)
+ () -> paimonHistogram.getStatistics().getQuantile(0.99));
+ }
+ return paimonHistogram;
+ }
+
+ @Override
+ public void close() {
+ for (String name : registeredNames) {
+ codahaleRegistry.remove(name);
+ }
+ registeredNames.clear();
+ super.close();
+ }
+
+ private String codahaleName(String metricName) {
+ return metricPrefix + "." + metricName;
+ }
+
+ private void registerCodahaleGauge(String name, com.codahale.metrics.Gauge> gauge) {
+ // Always remove first to ensure we replace any stale gauge from a previous commit.
+ // This avoids a race window in the try-register/catch-remove/re-register pattern.
+ codahaleRegistry.remove(name);
+ try {
+ codahaleRegistry.register(name, gauge);
+ registeredNames.add(name);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed to register Codahale metric '{}': {}", name, e.getMessage());
+ }
+ }
+
+ private static String buildPrefix(String groupName, Map variables) {
+ StringBuilder sb = new StringBuilder("paimon");
+ String table = variables.get("table");
+ if (table != null && !table.isEmpty()) {
+ sb.append('.').append(sanitize(table));
+ }
+ sb.append('.').append(sanitize(groupName));
+ return sb.toString();
+ }
+
+ private static String sanitize(String name) {
+ return name.replaceAll("[^a-zA-Z0-9_]", "_");
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 26ca12197c70..dfb00ce31f9f 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -32,6 +32,7 @@ import org.apache.paimon.io.{CompactIncrement, DataIncrement}
import org.apache.paimon.manifest.FileKind
import org.apache.paimon.spark.{SparkRow, SparkTypeUtils}
import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL}
import org.apache.paimon.spark.sort.TableSorter
import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
@@ -421,7 +422,9 @@ case class PaimonSparkWriter(
} else {
writeBuilder
}
+ val metricRegistry = SparkMetricRegistry()
val tableCommit = finalWriteBuilder.newCommit()
+ tableCommit.withMetricRegistry(metricRegistry)
try {
tableCommit.commit(commitMessages.toList.asJava)
} catch {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
index 9aeeed7a03cb..73cdf440bd87 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala
@@ -18,11 +18,12 @@
package org.apache.paimon.spark.metric
-import org.apache.paimon.metrics.{Gauge, Metric, MetricGroup, MetricGroupImpl, MetricRegistry}
+import org.apache.paimon.metrics.{Gauge, Metric, MetricGroup, MetricRegistry}
import org.apache.paimon.operation.metrics.{CommitMetrics, ScanMetrics, WriterBufferMetric}
import org.apache.paimon.spark._
import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.apache.spark.sql.paimon.PaimonMetricsSource
import java.util.{Map => JMap}
@@ -35,7 +36,8 @@ case class SparkMetricRegistry() extends MetricRegistry {
override def createMetricGroup(
groupName: String,
variables: JMap[String, String]): MetricGroup = {
- val metricGroup = new MetricGroupImpl(groupName, variables)
+ val source = PaimonMetricsSource.getOrCreate()
+ val metricGroup = new SparkMetricGroup(groupName, variables, source.metricRegistry)
metricGroups.put(groupName, metricGroup)
metricGroup
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonMetricsSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonMetricsSource.scala
new file mode 100644
index 000000000000..94e569863105
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonMetricsSource.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.paimon
+
+import com.codahale.metrics.jmx.JmxReporter
+import org.apache.spark.SparkEnv
+import org.apache.spark.metrics.source.Source
+
+/**
+ * A Spark [[Source]] that holds a shared Codahale [[com.codahale.metrics.MetricRegistry]]. Paimon
+ * metrics registered via [[org.apache.paimon.spark.metric.SparkMetricGroup]] are exposed through
+ * this source.
+ *
+ * A [[JmxReporter]] is started on the registry so that metrics are registered as JMX MBeans
+ * immediately when added. This is necessary because Spark's MetricsSystem takes a snapshot of the
+ * registry at `registerSource` time and does not pick up metrics added later.
+ *
+ * This class must live under `org.apache.spark` because Spark's [[Source]] trait is
+ * package-private.
+ *
+ * Use [[PaimonMetricsSource.getOrCreate()]] to obtain the singleton instance.
+ */
+class PaimonMetricsSource extends Source {
+
+ override val sourceName: String = "paimon"
+
+ override val metricRegistry: com.codahale.metrics.MetricRegistry =
+ new com.codahale.metrics.MetricRegistry()
+
+ private val jmxReporter: JmxReporter = JmxReporter
+ .forRegistry(metricRegistry)
+ .inDomain("paimon")
+ .build()
+
+ jmxReporter.start()
+
+ def stop(): Unit = {
+ jmxReporter.stop()
+ }
+}
+
+object PaimonMetricsSource {
+
+ @volatile private var instance: PaimonMetricsSource = _
+
+ def getOrCreate(): PaimonMetricsSource = {
+ if (instance == null) {
+ synchronized {
+ if (instance == null) {
+ val source = new PaimonMetricsSource()
+ val env = SparkEnv.get
+ if (env != null) {
+ env.metricsSystem.registerSource(source)
+ }
+ Runtime.getRuntime.addShutdownHook(new Thread("paimon-jmx-reporter-shutdown") {
+ override def run(): Unit = source.stop()
+ })
+ instance = source
+ }
+ }
+ }
+ instance
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/metric/SparkMetricGroupTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/metric/SparkMetricGroupTest.java
new file mode 100644
index 000000000000..16bc97f6a85e
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/metric/SparkMetricGroupTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.metric;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+
+import com.codahale.metrics.MetricRegistry;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SparkMetricGroup}. */
+public class SparkMetricGroupTest {
+
+ @Test
+ public void testGaugeRegisteredInCodahaleRegistry() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ group.gauge("lastCommitDuration", () -> 42L);
+
+ // Verify the Paimon metric is registered
+ Map paimonMetrics = group.getMetrics();
+ assertThat(paimonMetrics).containsKey("lastCommitDuration");
+ assertThat(((Gauge) paimonMetrics.get("lastCommitDuration")).getValue())
+ .isEqualTo(42L);
+
+ // Verify the Codahale metric is registered with the correct name
+ String expectedName = "paimon.myTable.commit.lastCommitDuration";
+ assertThat(codahaleRegistry.getGauges()).containsKey(expectedName);
+ assertThat(codahaleRegistry.getGauges().get(expectedName).getValue()).isEqualTo(42L);
+ }
+
+ @Test
+ public void testCounterRegisteredInCodahaleRegistry() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ Counter counter = group.counter("numCommits");
+ counter.inc();
+ counter.inc();
+ counter.inc();
+
+ // Verify the Codahale gauge wraps the counter's count
+ String expectedName = "paimon.myTable.commit.numCommits";
+ assertThat(codahaleRegistry.getGauges()).containsKey(expectedName);
+ assertThat(codahaleRegistry.getGauges().get(expectedName).getValue()).isEqualTo(3L);
+ }
+
+ @Test
+ public void testHistogramRegisteredInCodahaleRegistry() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ Histogram histogram = group.histogram("commitDuration", 100);
+ histogram.update(100);
+ histogram.update(200);
+ histogram.update(300);
+
+ String prefix = "paimon.myTable.commit.commitDuration";
+
+ // Verify all histogram sub-metrics are registered
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".count");
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".mean");
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".min");
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".max");
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".p50");
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".p95");
+ assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".p99");
+
+ // Verify values
+ assertThat(codahaleRegistry.getGauges().get(prefix + ".count").getValue()).isEqualTo(3L);
+ assertThat(codahaleRegistry.getGauges().get(prefix + ".min").getValue()).isEqualTo(100L);
+ assertThat(codahaleRegistry.getGauges().get(prefix + ".max").getValue()).isEqualTo(300L);
+ }
+
+ @Test
+ public void testGaugeValueUpdatesLazily() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ SparkMetricGroup group = new SparkMetricGroup("scan", variables, codahaleRegistry);
+
+ long[] holder = {0L};
+ group.gauge("lastScanDuration", () -> holder[0]);
+
+ String codahaleName = "paimon.myTable.scan.lastScanDuration";
+ assertThat(codahaleRegistry.getGauges().get(codahaleName).getValue()).isEqualTo(0L);
+
+ // Mutate the backing value and verify the Codahale gauge reflects it
+ holder[0] = 999L;
+ assertThat(codahaleRegistry.getGauges().get(codahaleName).getValue()).isEqualTo(999L);
+ }
+
+ @Test
+ public void testCloseRemovesCodahaleMetrics() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ group.gauge("g1", () -> 1L);
+ group.counter("c1");
+ group.histogram("h1", 100);
+
+ // Verify metrics exist
+ assertThat(codahaleRegistry.getGauges()).isNotEmpty();
+
+ group.close();
+
+ // Verify all Codahale metrics are removed
+ assertThat(codahaleRegistry.getMetrics()).isEmpty();
+ }
+
+ @Test
+ public void testMetricNameSanitization() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "my-db.my-table");
+
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ group.gauge("someMetric", () -> 1L);
+
+ // Special characters in table name should be replaced with underscores
+ String expectedName = "paimon.my_db_my_table.commit.someMetric";
+ assertThat(codahaleRegistry.getGauges()).containsKey(expectedName);
+ }
+
+ @Test
+ public void testMetricNameWithoutTableVariable() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+
+ SparkMetricGroup group = new SparkMetricGroup("scan", variables, codahaleRegistry);
+ group.gauge("duration", () -> 100L);
+
+ // Without a table variable, the prefix should be paimon.
+ String expectedName = "paimon.scan.duration";
+ assertThat(codahaleRegistry.getGauges()).containsKey(expectedName);
+ }
+
+ @Test
+ public void testMetricReplaceOnCollision() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ // Pre-register a Codahale gauge with the same name
+ String metricName = "paimon.myTable.commit.existingMetric";
+ codahaleRegistry.register(metricName, (com.codahale.metrics.Gauge) () -> -1L);
+ assertThat(codahaleRegistry.getGauges().get(metricName).getValue()).isEqualTo(-1L);
+
+ // Now register via SparkMetricGroup - should replace the existing one
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ group.gauge("existingMetric", () -> 42L);
+
+ assertThat(codahaleRegistry.getGauges().get(metricName).getValue()).isEqualTo(42L);
+ }
+
+ @Test
+ public void testPaimonMetricsPreservedForSparkUI() {
+ MetricRegistry codahaleRegistry = new MetricRegistry();
+ Map variables = new HashMap<>();
+ variables.put("table", "myTable");
+
+ SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry);
+ group.gauge("lastCommitDuration", () -> 100L);
+ group.counter("numOps");
+ group.histogram("commitDuration", 100);
+
+ // Verify getMetrics() still works for Spark UI integration (backward compat)
+ Map paimonMetrics = group.getMetrics();
+ assertThat(paimonMetrics).containsKey("lastCommitDuration");
+ assertThat(paimonMetrics).containsKey("numOps");
+ assertThat(paimonMetrics).containsKey("commitDuration");
+ assertThat(paimonMetrics).hasSize(3);
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index c74db8744e70..8af90a3d06a4 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -193,6 +193,88 @@ class PaimonMetricTest extends PaimonSparkTestBase with ScanPlanHelper {
}
}
+ private def getCodahaleRegistry: com.codahale.metrics.MetricRegistry = {
+ val sourceClass = Class.forName("org.apache.spark.sql.paimon.PaimonMetricsSource$")
+ val companion = sourceClass.getField("MODULE$").get(null)
+ val source = sourceClass.getMethod("getOrCreate").invoke(companion)
+ source.getClass
+ .getMethod("metricRegistry")
+ .invoke(source)
+ .asInstanceOf[com.codahale.metrics.MetricRegistry]
+ }
+
+ test("Paimon Metric: commit metrics registered in Codahale registry after write") {
+ withTable("T") {
+ sql("CREATE TABLE T (id INT, name STRING)")
+ sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
+
+ val gauges = getCodahaleRegistry.getGauges
+ val commitGauges =
+ gauges.keySet().toArray.map(_.toString).filter(_.startsWith("paimon.T.commit."))
+
+ Assertions.assertTrue(
+ commitGauges.nonEmpty,
+ s"Expected commit metrics in Codahale registry but found none. " +
+ s"All metrics: ${gauges.keySet()}")
+
+ Assertions.assertTrue(
+ commitGauges.exists(_.endsWith(".lastCommitDuration")),
+ "Missing lastCommitDuration metric")
+ Assertions.assertTrue(
+ commitGauges.exists(_.endsWith(".lastTableFilesAdded")),
+ "Missing lastTableFilesAdded metric")
+ Assertions.assertTrue(
+ commitGauges.exists(_.endsWith(".lastDeltaRecordsAppended")),
+ "Missing lastDeltaRecordsAppended metric")
+ }
+ }
+
+ test("Paimon Metric: Codahale metrics reflect actual commit values") {
+ withTable("T") {
+ sql("CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED BY (pt)")
+ sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+
+ val gauges = getCodahaleRegistry.getGauges
+ val tableMetrics = gauges
+ .entrySet()
+ .toArray
+ .map(_.asInstanceOf[java.util.Map.Entry[String, com.codahale.metrics.Gauge[_]]])
+ .filter(_.getKey.startsWith("paimon.T.commit."))
+
+ val recordsAppended = tableMetrics
+ .find(_.getKey.endsWith(".lastDeltaRecordsAppended"))
+ Assertions.assertTrue(recordsAppended.isDefined, "Missing lastDeltaRecordsAppended metric")
+ Assertions.assertEquals(2L, recordsAppended.get.getValue.getValue)
+
+ val partitionsWritten = tableMetrics
+ .find(_.getKey.endsWith(".lastPartitionsWritten"))
+ Assertions.assertTrue(partitionsWritten.isDefined, "Missing lastPartitionsWritten metric")
+ Assertions.assertEquals(2L, partitionsWritten.get.getValue.getValue)
+ }
+ }
+
+ test("Paimon Metric: Codahale metrics update on subsequent commits") {
+ withTable("T") {
+ sql("CREATE TABLE T (id INT, name STRING)")
+ sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ def getGaugeValue(suffix: String): Long = {
+ val gauges = getCodahaleRegistry.getGauges
+ val entry = gauges
+ .entrySet()
+ .toArray
+ .map(_.asInstanceOf[java.util.Map.Entry[String, com.codahale.metrics.Gauge[_]]])
+ .find(e => e.getKey.startsWith("paimon.T.commit.") && e.getKey.endsWith(suffix))
+ entry.map(_.getValue.getValue.asInstanceOf[Long]).getOrElse(-1L)
+ }
+
+ Assertions.assertEquals(3L, getGaugeValue(".lastDeltaRecordsAppended"))
+
+ sql("INSERT INTO T VALUES (4, 'd'), (5, 'e')")
+ Assertions.assertEquals(2L, getGaugeValue(".lastDeltaRecordsAppended"))
+ }
+ }
+
def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
metrics.find(_.name() == name).get.value()
}