Skip to content

Commit 7c8a8d9

Browse files
committed
refactor: introduce CometInvokeExpressionSerde trait for generic Invoke dispatch
1 parent 9b114fb commit 7c8a8d9

File tree

3 files changed

+117
-73
lines changed

3 files changed

+117
-73
lines changed
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.serde
21+
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
23+
24+
/**
25+
* Serializer contract for Spark `Invoke` expressions that wrap a private evaluator object.
26+
*
27+
* In Spark 4.0 several built-in functions (e.g. `parse_url`) became `RuntimeReplaceable` and are
28+
* rewritten by the analyser into an `Invoke(evaluator, arg, ...)` node whose first child is a
29+
* `Literal` of `ObjectType` holding a private evaluator instance. The Spark expression class that
30+
* Comet normally dispatches on (e.g. `ParseUrl`) is therefore never seen at serde time on Spark
31+
* 4.0.
32+
*
33+
* Implementors expose:
34+
* - [[invokeTargetClassName]] - the fully-qualified name of the evaluator class embedded in the
35+
* first `Literal(_, ObjectType)` child of the `Invoke` node. This is the key used by
36+
* [[QueryPlanSerde]] to route the node to the correct handler.
37+
* - [[convertFromInvoke]] - the actual serialization logic, receiving the raw `Invoke`
38+
* expression (unerased, with all children including the evaluator literal).
39+
*
40+
* To register a new handler, add the object to [[QueryPlanSerde.invokeSerdeByTargetClassName]].
41+
*/
42+
trait CometInvokeExpressionSerde[T <: Expression] {
43+
44+
/**
45+
* Fully-qualified class name of the private evaluator object held in the first child
46+
* `Literal(_, ObjectType(...))` of the `Invoke` node.
47+
*
48+
* Example: `"org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator"`
49+
*/
50+
def invokeTargetClassName: String
51+
52+
/**
53+
* Serialize the `Invoke` expression into a Comet proto `Expr`.
54+
*
55+
* @param expr
56+
* The raw `Invoke` expression node (first child is the evaluator literal).
57+
* @param inputs
58+
* Resolved input attributes for the enclosing operator.
59+
* @param binding
60+
* Whether attributes are bound (relevant for aggregate expressions).
61+
* @return
62+
* `Some(Expr)` on success, `None` if the expression cannot be handled (the implementor is
63+
* responsible for calling `withInfo` to record the reason).
64+
*/
65+
def convertFromInvoke(
66+
expr: T,
67+
inputs: Seq[Attribute],
68+
binding: Boolean): Option[ExprOuterClass.Expr]
69+
}

spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,12 @@ import org.apache.comet.shims.CometExprShim
4444
*/
4545
object QueryPlanSerde extends Logging with CometExprShim {
4646

47-
// Generic serializer contract for Spark Invoke expressions.
48-
private type InvokeConverter = (Expression, Seq[Attribute], Boolean) => Option[Expr]
49-
50-
// Dispatch table keyed by the runtime class name stored in Invoke target ObjectType.
51-
private val invokeConvertersByTargetClassName: Map[String, InvokeConverter] = Map(
52-
CometParseUrl.invokeTargetClassName ->
53-
((expr: Expression, inputs: Seq[Attribute], binding: Boolean) =>
54-
CometParseUrl.convertFromInvoke(expr, inputs, binding)))
47+
// Registry of Invoke-expression handlers, keyed by the fully-qualified class name of the
48+
// evaluator object embedded in the first Literal(_, ObjectType(...)) child of the Invoke node.
49+
// To support a new RuntimeReplaceable expression rewritten to Invoke in Spark 4.0, implement
50+
// CometInvokeExpressionSerde and add the object here.
51+
private val invokeSerdeByTargetClassName: Map[String, CometInvokeExpressionSerde[_ <: Expression]] =
52+
Seq(CometParseUrl).map(s => s.invokeTargetClassName -> s).toMap
5553

5654
// Extracts the target object class name from an Invoke-like expression.
5755
private def invokeTargetClassName(expr: Expression): Option[String] = {
@@ -63,17 +61,16 @@ object QueryPlanSerde extends Logging with CometExprShim {
6361
}
6462
}
6563

66-
// Routes Invoke expressions to a converter based on the target object class name.
64+
// Routes Invoke expressions to a handler based on the target object class name.
6765
private def convertInvokeExpression(
6866
expr: Expression,
6967
inputs: Seq[Attribute],
7068
binding: Boolean): Option[Expr] =
7169
for {
72-
innerObjectTypeExpression <- invokeTargetClassName(expr)
73-
invokeExpr <- invokeConvertersByTargetClassName.get(innerObjectTypeExpression)
74-
expression <- invokeExpr(expr, inputs, binding)
75-
76-
} yield expression
70+
targetClassName <- invokeTargetClassName(expr)
71+
handler <- invokeSerdeByTargetClassName.get(targetClassName)
72+
result <- handler.convertFromInvoke(expr, inputs, binding)
73+
} yield result
7774

7875
private val arrayExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
7976
classOf[ArrayAppend] -> CometArrayAppend,

spark/src/main/scala/org/apache/comet/serde/strings.scala

Lines changed: 37 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -383,55 +383,52 @@ object CometStringSplit extends CometExpressionSerde[StringSplit] {
383383
}
384384
}
385385

386-
object CometParseUrl extends CometExpressionSerde[ParseUrl] {
386+
object CometParseUrl extends CometExpressionSerde[ParseUrl] with CometInvokeExpressionSerde[ParseUrl] {
387387

388-
// Class name of the Spark 4.0 internal evaluator embedded in the Invoke node that replaces
389-
// ParseUrl at analysis time (RuntimeReplaceable). This constant is used as the dispatch key
390-
// in QueryPlanSerde.invokeConvertersByTargetClassName.
391-
val invokeTargetClassName: String =
388+
// In Spark 4.0, ParseUrl became RuntimeReplaceable and the analyser rewrites it to
389+
// Invoke(ParseUrlEvaluator.evaluate, url, part[, key]). The first child is a
390+
// Literal(evaluator, ObjectType(ParseUrlEvaluator)). This class name is the key
391+
// used by QueryPlanSerde to route the Invoke node to this handler.
392+
override val invokeTargetClassName: String =
392393
"org.apache.spark.sql.catalyst.expressions.url.ParseUrlEvaluator"
393394

394-
// ---------------------------------------------------------------------------
395-
// Spark 4.0 Invoke path helpers
396-
// ---------------------------------------------------------------------------
397-
398-
// Extracts the failOnError flag from the ParseUrlEvaluator instance embedded in the
399-
// Invoke literal. Uses reflection because ParseUrlEvaluator is a private class.
400-
// Falls back to SQLConf.get.ansiEnabled when reflection fails (evaluator is null, or
401-
// the method has been renamed in a future Spark version).
402-
private[serde] def failOnErrorFromInvoke(expr: Expression): Boolean =
403-
expr.children.headOption match {
404-
case Some(Literal(evaluator, objectType: ObjectType))
405-
if evaluator != null && objectType.cls.getName == invokeTargetClassName =>
406-
try {
407-
evaluator.getClass
408-
.getMethod("failOnError")
409-
.invoke(evaluator)
410-
.asInstanceOf[Boolean]
411-
} catch {
412-
case _: ReflectiveOperationException => SQLConf.get.ansiEnabled
413-
}
414-
case _ =>
415-
SQLConf.get.ansiEnabled
395+
// Extracts the failOnError flag from the ParseUrlEvaluator instance via reflection.
396+
// Falls back to SQLConf.get.ansiEnabled when reflection is unavailable (null evaluator
397+
// or renamed accessor in a future Spark version).
398+
private def failOnErrorFromEvaluator(evaluator: AnyRef): Boolean =
399+
try {
400+
evaluator.getClass.getMethod("failOnError").invoke(evaluator).asInstanceOf[Boolean]
401+
} catch {
402+
case _: ReflectiveOperationException => SQLConf.get.ansiEnabled
416403
}
417404

418-
// Drops the leading ParseUrlEvaluator literal from the Invoke children list, leaving
419-
// only the actual URL/part/key arguments.
420-
private def dropEvaluatorLiteral(children: Seq[Expression]): Seq[Expression] =
421-
children.headOption match {
422-
case Some(Literal(_, objectType: ObjectType))
405+
override def convertFromInvoke(
406+
expr: ParseUrl,
407+
inputs: Seq[Attribute],
408+
binding: Boolean): Option[Expr] = {
409+
// The first child is Literal(evaluator, ObjectType(ParseUrlEvaluator)).
410+
// Strip it and read failOnError from it; the remaining children are (url, part[, key]).
411+
val (urlArgs, failOnError) = expr.children match {
412+
case Literal(evaluator, objectType: ObjectType) +: rest
423413
if objectType.cls.getName == invokeTargetClassName =>
424-
children.drop(1)
425-
case _ =>
426-
children
414+
val foe =
415+
if (evaluator != null) failOnErrorFromEvaluator(evaluator.asInstanceOf[AnyRef])
416+
else SQLConf.get.ansiEnabled
417+
(rest, foe)
418+
case args =>
419+
(args, SQLConf.get.ansiEnabled)
427420
}
421+
toProto(expr, urlArgs, failOnError, inputs, binding)
422+
}
423+
424+
// In Spark 3.5, ParseUrl is a concrete expression node with a `failOnError` field
425+
// that is directly accessible without reflection.
426+
override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] =
427+
toProto(expr, expr.children, expr.failOnError, inputs, binding)
428428

429-
// ---------------------------------------------------------------------------
430-
// Core serialization
431-
// ---------------------------------------------------------------------------
432429

433-
// Converts the parse_url/try_parse_url arguments into a proto Expr.
434-
// `urlArgs` must already be stripped of the evaluator literal and the failOnError flag literal.
430+
// Serializes (url, part[, key]) arguments into the appropriate native function call.
431+
// Uses parse_url (ANSI/strict) or try_parse_url (legacy/lenient) depending on failOnError.
435432
private def toProto(
436433
expr: Expression,
437434
urlArgs: Seq[Expression],
@@ -443,25 +440,6 @@ object CometParseUrl extends CometExpressionSerde[ParseUrl] {
443440
val optExpr = scalarFunctionExprToProto(functionName, childExprs: _*)
444441
optExprWithInfo(optExpr, expr, urlArgs: _*)
445442
}
446-
447-
// ---------------------------------------------------------------------------
448-
// Public entry points
449-
// ---------------------------------------------------------------------------
450-
451-
// Spark 3.5 path: ParseUrl is a concrete expression node with a `failOnError` field.
452-
override def convert(expr: ParseUrl, inputs: Seq[Attribute], binding: Boolean): Option[Expr] =
453-
toProto(expr, expr.children, expr.failOnError, inputs, binding)
454-
455-
// Spark 4.0 path: ParseUrl is replaced by Invoke(ParseUrlEvaluator, url, part[, key]).
456-
// Called from QueryPlanSerde.convertInvokeExpression via invokeConvertersByTargetClassName.
457-
def convertFromInvoke(
458-
expr: Expression,
459-
inputs: Seq[Attribute],
460-
binding: Boolean): Option[Expr] = {
461-
val failOnError = failOnErrorFromInvoke(expr)
462-
val urlArgs = dropEvaluatorLiteral(expr.children)
463-
toProto(expr, urlArgs, failOnError, inputs, binding)
464-
}
465443
}
466444

467445
trait CommonStringExprs {

0 commit comments

Comments
 (0)