diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index d36b6a3b40..f093b6a74c 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -21,7 +21,7 @@ package org.apache.comet.serde import java.util.Locale -import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} +import org.apache.spark.sql.catalyst.expressions.{Attribute, DateAdd, DateDiff, DateFormatClass, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Expression, GetDateField, Hour, LastDay, Literal, MakeDate, Minute, Month, NextDay, Quarter, Second, TruncDate, TruncTimestamp, UnixDate, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -541,3 +541,36 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { } } } + +trait CommonDateTimeExprs { + + def secondsOfTimeToProto( + expr: Expression, + inputs: Seq[Attribute], + binding: Boolean): Option[Expr] = { + val childOpt = expr.children.headOption.orElse { + withInfo(expr, "SecondsOfTime has no child expression") + None + } + + childOpt.flatMap { child => + exprToProtoInternal(child, inputs, binding) + .map { childExpr => + val builder = ExprOuterClass.Second.newBuilder() + builder.setChild(childExpr) + + // SecondsOfTime does not carry a timeZoneId; assume UTC. + builder.setTimezone("UTC") + + ExprOuterClass.Expr + .newBuilder() + .setSecond(builder) + .build() + } + .orElse { + withInfo(expr, child) + None + } + } + } +} diff --git a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala index 600931c346..3d36d0ace0 100644 --- a/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.4/org/apache/comet/shims/CometExprShim.scala @@ -22,13 +22,13 @@ package org.apache.comet.shims import org.apache.spark.sql.catalyst.expressions._ import org.apache.comet.expressions.CometEvalMode -import org.apache.comet.serde.CommonStringExprs +import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) @@ -43,6 +43,10 @@ trait CometExprShim extends CommonStringExprs { // Right child is the encoding expression. stringDecode(expr, s.charset, s.bin, inputs, binding) + case _: UnaryExpression if expr.prettyName == "seconds_of_time" => + // SecondsOfTime may not exist in all Spark versions, so we match by prettyName + secondsOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala index 8e9cb1c07b..b61a5a4b18 100644 --- a/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-3.5/org/apache/comet/shims/CometExprShim.scala @@ -24,14 +24,14 @@ import org.apache.spark.sql.types.DataTypes import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) @@ -91,6 +91,10 @@ trait CometExprShim extends CommonStringExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) + case _: UnaryExpression if expr.prettyName == "seconds_of_time" => + // SecondsOfTime may not exist in all Spark versions, so we match by prettyName + secondsOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala index 2c5cebd166..de8eef389a 100644 --- a/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala +++ b/spark/src/main/spark-4.0/org/apache/comet/shims/CometExprShim.scala @@ -27,14 +27,14 @@ import org.apache.spark.sql.types.{BinaryType, BooleanType, DataTypes, StringTyp import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.expressions.{CometCast, CometEvalMode} -import org.apache.comet.serde.{CommonStringExprs, Compatible, ExprOuterClass, Incompatible} +import org.apache.comet.serde.{CommonDateTimeExprs, CommonStringExprs, Compatible, ExprOuterClass, Incompatible} import org.apache.comet.serde.ExprOuterClass.{BinaryOutputStyle, Expr} import org.apache.comet.serde.QueryPlanSerde.exprToProtoInternal /** * `CometExprShim` acts as a shim for parsing expressions from different Spark versions. */ -trait CometExprShim extends CommonStringExprs { +trait CometExprShim extends CommonStringExprs with CommonDateTimeExprs { protected def evalMode(c: Cast): CometEvalMode.Value = CometEvalModeUtil.fromSparkEvalMode(c.evalMode) @@ -113,6 +113,10 @@ trait CometExprShim extends CommonStringExprs { // val optExpr = scalarFunctionExprToProto("width_bucket", childExprs: _*) // optExprWithInfo(optExpr, wb, wb.children: _*) + case _: UnaryExpression if expr.prettyName == "seconds_of_time" => + // SecondsOfTime may not exist in all Spark versions, so we match by prettyName + secondsOfTimeToProto(expr, inputs, binding) + case _ => None } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index f0f022868f..fb7b096b84 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -572,6 +572,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("seconds_of_time expression") { + // This test verifies that seconds() function works correctly with timestamp columns. + // If Spark generates SecondOfTime expression (a RuntimeReplaceable expression), + // it will be handled by the version-specific shim and converted to Second proto. + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "part-r-0.parquet") + makeRawTimeParquetFile(path, dictionaryEnabled = dictionaryEnabled, 10000) + readParquetFile(path.toString) { df => + val query = df.select(expr("second(_1)")) + + checkSparkAnswerAndOperator(query) + } + } + } + } + test("hour on int96 timestamp column") { import testImplicits._