diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/metric/SparkMetricGroup.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/metric/SparkMetricGroup.java new file mode 100644 index 000000000000..4eb30bf054f1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/metric/SparkMetricGroup.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.metric; + +import org.apache.paimon.metrics.Counter; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.MetricGroupImpl; + +import com.codahale.metrics.MetricRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A {@link org.apache.paimon.metrics.MetricGroup} that extends {@link MetricGroupImpl} to + * additionally register metrics with a Codahale {@link MetricRegistry} for exposure via Spark's + * MetricsSystem (JMX, Prometheus, etc.). + * + *

This preserves full backward compatibility with the existing Spark UI integration (which reads + * from {@link MetricGroupImpl#getMetrics()}) while also making metrics available to external + * monitoring systems. + */ +public class SparkMetricGroup extends MetricGroupImpl { + + private static final Logger LOG = LoggerFactory.getLogger(SparkMetricGroup.class); + + private final MetricRegistry codahaleRegistry; + private final String metricPrefix; + private final Set registeredNames; + + public SparkMetricGroup( + String groupName, Map variables, MetricRegistry codahaleRegistry) { + super(groupName, variables); + this.codahaleRegistry = codahaleRegistry; + this.metricPrefix = buildPrefix(groupName, variables); + this.registeredNames = new HashSet<>(); + } + + @Override + public Counter counter(String name) { + Counter paimonCounter = super.counter(name); + if (paimonCounter != null) { + registerCodahaleGauge( + codahaleName(name), (com.codahale.metrics.Gauge) paimonCounter::getCount); + } + return paimonCounter; + } + + @SuppressWarnings("unchecked") + @Override + public Gauge gauge(String name, Gauge gauge) { + Gauge paimonGauge = super.gauge(name, gauge); + if (paimonGauge != null) { + registerCodahaleGauge( + codahaleName(name), (com.codahale.metrics.Gauge) paimonGauge::getValue); + } + return paimonGauge; + } + + @Override + public Histogram histogram(String name, int windowSize) { + Histogram paimonHistogram = super.histogram(name, windowSize); + if (paimonHistogram != null) { + String base = codahaleName(name); + registerCodahaleGauge( + base + ".count", (com.codahale.metrics.Gauge) paimonHistogram::getCount); + registerCodahaleGauge( + base + ".mean", + (com.codahale.metrics.Gauge) + () -> paimonHistogram.getStatistics().getMean()); + registerCodahaleGauge( + base + ".min", + (com.codahale.metrics.Gauge) + () -> paimonHistogram.getStatistics().getMin()); + registerCodahaleGauge( + base + ".max", + (com.codahale.metrics.Gauge) + () -> paimonHistogram.getStatistics().getMax()); + registerCodahaleGauge( + base + ".p50", + (com.codahale.metrics.Gauge) + () -> paimonHistogram.getStatistics().getQuantile(0.5)); + registerCodahaleGauge( + base + ".p95", + (com.codahale.metrics.Gauge) + () -> paimonHistogram.getStatistics().getQuantile(0.95)); + registerCodahaleGauge( + base + ".p99", + (com.codahale.metrics.Gauge) + () -> paimonHistogram.getStatistics().getQuantile(0.99)); + } + return paimonHistogram; + } + + @Override + public void close() { + for (String name : registeredNames) { + codahaleRegistry.remove(name); + } + registeredNames.clear(); + super.close(); + } + + private String codahaleName(String metricName) { + return metricPrefix + "." + metricName; + } + + private void registerCodahaleGauge(String name, com.codahale.metrics.Gauge gauge) { + // Always remove first to ensure we replace any stale gauge from a previous commit. + // This avoids a race window in the try-register/catch-remove/re-register pattern. + codahaleRegistry.remove(name); + try { + codahaleRegistry.register(name, gauge); + registeredNames.add(name); + } catch (IllegalArgumentException e) { + LOG.warn("Failed to register Codahale metric '{}': {}", name, e.getMessage()); + } + } + + private static String buildPrefix(String groupName, Map variables) { + StringBuilder sb = new StringBuilder("paimon"); + String table = variables.get("table"); + if (table != null && !table.isEmpty()) { + sb.append('.').append(sanitize(table)); + } + sb.append('.').append(sanitize(groupName)); + return sb.toString(); + } + + private static String sanitize(String name) { + return name.replaceAll("[^a-zA-Z0-9_]", "_"); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 26ca12197c70..dfb00ce31f9f 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -32,6 +32,7 @@ import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.FileKind import org.apache.paimon.spark.{SparkRow, SparkTypeUtils} import org.apache.paimon.spark.catalog.functions.BucketFunction +import org.apache.paimon.spark.metric.SparkMetricRegistry import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL} import org.apache.paimon.spark.sort.TableSorter import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled @@ -421,7 +422,9 @@ case class PaimonSparkWriter( } else { writeBuilder } + val metricRegistry = SparkMetricRegistry() val tableCommit = finalWriteBuilder.newCommit() + tableCommit.withMetricRegistry(metricRegistry) try { tableCommit.commit(commitMessages.toList.asJava) } catch { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala index 9aeeed7a03cb..73cdf440bd87 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/metric/SparkMetricRegistry.scala @@ -18,11 +18,12 @@ package org.apache.paimon.spark.metric -import org.apache.paimon.metrics.{Gauge, Metric, MetricGroup, MetricGroupImpl, MetricRegistry} +import org.apache.paimon.metrics.{Gauge, Metric, MetricGroup, MetricRegistry} import org.apache.paimon.operation.metrics.{CommitMetrics, ScanMetrics, WriterBufferMetric} import org.apache.paimon.spark._ import org.apache.spark.sql.connector.metric.CustomTaskMetric +import org.apache.spark.sql.paimon.PaimonMetricsSource import java.util.{Map => JMap} @@ -35,7 +36,8 @@ case class SparkMetricRegistry() extends MetricRegistry { override def createMetricGroup( groupName: String, variables: JMap[String, String]): MetricGroup = { - val metricGroup = new MetricGroupImpl(groupName, variables) + val source = PaimonMetricsSource.getOrCreate() + val metricGroup = new SparkMetricGroup(groupName, variables, source.metricRegistry) metricGroups.put(groupName, metricGroup) metricGroup } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonMetricsSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonMetricsSource.scala new file mode 100644 index 000000000000..94e569863105 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/PaimonMetricsSource.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.paimon + +import com.codahale.metrics.jmx.JmxReporter +import org.apache.spark.SparkEnv +import org.apache.spark.metrics.source.Source + +/** + * A Spark [[Source]] that holds a shared Codahale [[com.codahale.metrics.MetricRegistry]]. Paimon + * metrics registered via [[org.apache.paimon.spark.metric.SparkMetricGroup]] are exposed through + * this source. + * + * A [[JmxReporter]] is started on the registry so that metrics are registered as JMX MBeans + * immediately when added. This is necessary because Spark's MetricsSystem takes a snapshot of the + * registry at `registerSource` time and does not pick up metrics added later. + * + * This class must live under `org.apache.spark` because Spark's [[Source]] trait is + * package-private. + * + * Use [[PaimonMetricsSource.getOrCreate()]] to obtain the singleton instance. + */ +class PaimonMetricsSource extends Source { + + override val sourceName: String = "paimon" + + override val metricRegistry: com.codahale.metrics.MetricRegistry = + new com.codahale.metrics.MetricRegistry() + + private val jmxReporter: JmxReporter = JmxReporter + .forRegistry(metricRegistry) + .inDomain("paimon") + .build() + + jmxReporter.start() + + def stop(): Unit = { + jmxReporter.stop() + } +} + +object PaimonMetricsSource { + + @volatile private var instance: PaimonMetricsSource = _ + + def getOrCreate(): PaimonMetricsSource = { + if (instance == null) { + synchronized { + if (instance == null) { + val source = new PaimonMetricsSource() + val env = SparkEnv.get + if (env != null) { + env.metricsSystem.registerSource(source) + } + Runtime.getRuntime.addShutdownHook(new Thread("paimon-jmx-reporter-shutdown") { + override def run(): Unit = source.stop() + }) + instance = source + } + } + } + instance + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/metric/SparkMetricGroupTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/metric/SparkMetricGroupTest.java new file mode 100644 index 000000000000..16bc97f6a85e --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/metric/SparkMetricGroupTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.metric; + +import org.apache.paimon.metrics.Counter; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.Metric; + +import com.codahale.metrics.MetricRegistry; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SparkMetricGroup}. */ +public class SparkMetricGroupTest { + + @Test + public void testGaugeRegisteredInCodahaleRegistry() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + group.gauge("lastCommitDuration", () -> 42L); + + // Verify the Paimon metric is registered + Map paimonMetrics = group.getMetrics(); + assertThat(paimonMetrics).containsKey("lastCommitDuration"); + assertThat(((Gauge) paimonMetrics.get("lastCommitDuration")).getValue()) + .isEqualTo(42L); + + // Verify the Codahale metric is registered with the correct name + String expectedName = "paimon.myTable.commit.lastCommitDuration"; + assertThat(codahaleRegistry.getGauges()).containsKey(expectedName); + assertThat(codahaleRegistry.getGauges().get(expectedName).getValue()).isEqualTo(42L); + } + + @Test + public void testCounterRegisteredInCodahaleRegistry() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + Counter counter = group.counter("numCommits"); + counter.inc(); + counter.inc(); + counter.inc(); + + // Verify the Codahale gauge wraps the counter's count + String expectedName = "paimon.myTable.commit.numCommits"; + assertThat(codahaleRegistry.getGauges()).containsKey(expectedName); + assertThat(codahaleRegistry.getGauges().get(expectedName).getValue()).isEqualTo(3L); + } + + @Test + public void testHistogramRegisteredInCodahaleRegistry() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + Histogram histogram = group.histogram("commitDuration", 100); + histogram.update(100); + histogram.update(200); + histogram.update(300); + + String prefix = "paimon.myTable.commit.commitDuration"; + + // Verify all histogram sub-metrics are registered + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".count"); + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".mean"); + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".min"); + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".max"); + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".p50"); + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".p95"); + assertThat(codahaleRegistry.getGauges()).containsKey(prefix + ".p99"); + + // Verify values + assertThat(codahaleRegistry.getGauges().get(prefix + ".count").getValue()).isEqualTo(3L); + assertThat(codahaleRegistry.getGauges().get(prefix + ".min").getValue()).isEqualTo(100L); + assertThat(codahaleRegistry.getGauges().get(prefix + ".max").getValue()).isEqualTo(300L); + } + + @Test + public void testGaugeValueUpdatesLazily() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + SparkMetricGroup group = new SparkMetricGroup("scan", variables, codahaleRegistry); + + long[] holder = {0L}; + group.gauge("lastScanDuration", () -> holder[0]); + + String codahaleName = "paimon.myTable.scan.lastScanDuration"; + assertThat(codahaleRegistry.getGauges().get(codahaleName).getValue()).isEqualTo(0L); + + // Mutate the backing value and verify the Codahale gauge reflects it + holder[0] = 999L; + assertThat(codahaleRegistry.getGauges().get(codahaleName).getValue()).isEqualTo(999L); + } + + @Test + public void testCloseRemovesCodahaleMetrics() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + group.gauge("g1", () -> 1L); + group.counter("c1"); + group.histogram("h1", 100); + + // Verify metrics exist + assertThat(codahaleRegistry.getGauges()).isNotEmpty(); + + group.close(); + + // Verify all Codahale metrics are removed + assertThat(codahaleRegistry.getMetrics()).isEmpty(); + } + + @Test + public void testMetricNameSanitization() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "my-db.my-table"); + + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + group.gauge("someMetric", () -> 1L); + + // Special characters in table name should be replaced with underscores + String expectedName = "paimon.my_db_my_table.commit.someMetric"; + assertThat(codahaleRegistry.getGauges()).containsKey(expectedName); + } + + @Test + public void testMetricNameWithoutTableVariable() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + + SparkMetricGroup group = new SparkMetricGroup("scan", variables, codahaleRegistry); + group.gauge("duration", () -> 100L); + + // Without a table variable, the prefix should be paimon. + String expectedName = "paimon.scan.duration"; + assertThat(codahaleRegistry.getGauges()).containsKey(expectedName); + } + + @Test + public void testMetricReplaceOnCollision() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + // Pre-register a Codahale gauge with the same name + String metricName = "paimon.myTable.commit.existingMetric"; + codahaleRegistry.register(metricName, (com.codahale.metrics.Gauge) () -> -1L); + assertThat(codahaleRegistry.getGauges().get(metricName).getValue()).isEqualTo(-1L); + + // Now register via SparkMetricGroup - should replace the existing one + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + group.gauge("existingMetric", () -> 42L); + + assertThat(codahaleRegistry.getGauges().get(metricName).getValue()).isEqualTo(42L); + } + + @Test + public void testPaimonMetricsPreservedForSparkUI() { + MetricRegistry codahaleRegistry = new MetricRegistry(); + Map variables = new HashMap<>(); + variables.put("table", "myTable"); + + SparkMetricGroup group = new SparkMetricGroup("commit", variables, codahaleRegistry); + group.gauge("lastCommitDuration", () -> 100L); + group.counter("numOps"); + group.histogram("commitDuration", 100); + + // Verify getMetrics() still works for Spark UI integration (backward compat) + Map paimonMetrics = group.getMetrics(); + assertThat(paimonMetrics).containsKey("lastCommitDuration"); + assertThat(paimonMetrics).containsKey("numOps"); + assertThat(paimonMetrics).containsKey("commitDuration"); + assertThat(paimonMetrics).hasSize(3); + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala index c74db8744e70..8af90a3d06a4 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala @@ -193,6 +193,88 @@ class PaimonMetricTest extends PaimonSparkTestBase with ScanPlanHelper { } } + private def getCodahaleRegistry: com.codahale.metrics.MetricRegistry = { + val sourceClass = Class.forName("org.apache.spark.sql.paimon.PaimonMetricsSource$") + val companion = sourceClass.getField("MODULE$").get(null) + val source = sourceClass.getMethod("getOrCreate").invoke(companion) + source.getClass + .getMethod("metricRegistry") + .invoke(source) + .asInstanceOf[com.codahale.metrics.MetricRegistry] + } + + test("Paimon Metric: commit metrics registered in Codahale registry after write") { + withTable("T") { + sql("CREATE TABLE T (id INT, name STRING)") + sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')") + + val gauges = getCodahaleRegistry.getGauges + val commitGauges = + gauges.keySet().toArray.map(_.toString).filter(_.startsWith("paimon.T.commit.")) + + Assertions.assertTrue( + commitGauges.nonEmpty, + s"Expected commit metrics in Codahale registry but found none. " + + s"All metrics: ${gauges.keySet()}") + + Assertions.assertTrue( + commitGauges.exists(_.endsWith(".lastCommitDuration")), + "Missing lastCommitDuration metric") + Assertions.assertTrue( + commitGauges.exists(_.endsWith(".lastTableFilesAdded")), + "Missing lastTableFilesAdded metric") + Assertions.assertTrue( + commitGauges.exists(_.endsWith(".lastDeltaRecordsAppended")), + "Missing lastDeltaRecordsAppended metric") + } + } + + test("Paimon Metric: Codahale metrics reflect actual commit values") { + withTable("T") { + sql("CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED BY (pt)") + sql("INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')") + + val gauges = getCodahaleRegistry.getGauges + val tableMetrics = gauges + .entrySet() + .toArray + .map(_.asInstanceOf[java.util.Map.Entry[String, com.codahale.metrics.Gauge[_]]]) + .filter(_.getKey.startsWith("paimon.T.commit.")) + + val recordsAppended = tableMetrics + .find(_.getKey.endsWith(".lastDeltaRecordsAppended")) + Assertions.assertTrue(recordsAppended.isDefined, "Missing lastDeltaRecordsAppended metric") + Assertions.assertEquals(2L, recordsAppended.get.getValue.getValue) + + val partitionsWritten = tableMetrics + .find(_.getKey.endsWith(".lastPartitionsWritten")) + Assertions.assertTrue(partitionsWritten.isDefined, "Missing lastPartitionsWritten metric") + Assertions.assertEquals(2L, partitionsWritten.get.getValue.getValue) + } + } + + test("Paimon Metric: Codahale metrics update on subsequent commits") { + withTable("T") { + sql("CREATE TABLE T (id INT, name STRING)") + sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + def getGaugeValue(suffix: String): Long = { + val gauges = getCodahaleRegistry.getGauges + val entry = gauges + .entrySet() + .toArray + .map(_.asInstanceOf[java.util.Map.Entry[String, com.codahale.metrics.Gauge[_]]]) + .find(e => e.getKey.startsWith("paimon.T.commit.") && e.getKey.endsWith(suffix)) + entry.map(_.getValue.getValue.asInstanceOf[Long]).getOrElse(-1L) + } + + Assertions.assertEquals(3L, getGaugeValue(".lastDeltaRecordsAppended")) + + sql("INSERT INTO T VALUES (4, 'd'), (5, 'e')") + Assertions.assertEquals(2L, getGaugeValue(".lastDeltaRecordsAppended")) + } + } + def metric(metrics: Array[CustomTaskMetric], name: String): Long = { metrics.find(_.name() == name).get.value() }