diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java index 84d52fddced..fb434e08752 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/tuple/Attribute.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.texera.amber.pybuilder.EncodableStringAnnotation; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -49,6 +50,7 @@ public Attribute( @JsonProperty(value = "attributeName", required = true) @NotBlank(message = "Attribute name is required") + @EncodableStringAnnotation public String getName() { return attributeName; } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala new file mode 100644 index 00000000000..70836cbeda4 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjector.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder +import org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext + +import scala.util.matching.Regex + +/** + * Injects the reserved UI-parameter hook into user-written Python UDF code. + * + * Operator descriptors should call this after loading saved [[UiUDFParameter]] values and before sending Python source + * to runtime execution. The injected hook returns decoded parameter names and values that Python runtime support reads + * before the user's `open()` method runs. + */ +object PythonUdfUiParameterInjector { + + private val InjectedUiParametersHookMethodName = "_texera_injected_ui_parameters" + private val InjectedUiParametersHookMethodHeader = + s"def $InjectedUiParametersHookMethodName(self) -> Dict[str, Any]:" + private val UnsupportedUiParameterTypes = Set(AttributeType.BINARY, AttributeType.LARGE_BINARY) + + // Keep supported user-facing UDF class names in sync with the frontend parser. + private val SupportedPythonUdfClassHeaderRegex: Regex = + """(?m)^([ \t]*)class\s+(ProcessTupleOperator|ProcessBatchOperator|ProcessTableOperator|GenerateOperator)\s*\([^)]*\)\s*:\s*(?:#.*)?$""".r + + private def validate(uiParameters: List[UiUDFParameter]): Unit = { + val attributes = uiParameters.map(parameterAttribute) + attributes.foreach(validateSupportedType) + + attributes + .groupBy(_.getName) + .collectFirst { + case (parameterName, matchingAttributes) if matchingAttributes.size > 1 => parameterName + } + .foreach { duplicateName => + throw new RuntimeException(s"UiParameter name '$duplicateName' is declared more than once.") + } + } + + private def parameterAttribute(parameter: UiUDFParameter): Attribute = + Option(parameter).flatMap(parameter => Option(parameter.attribute)).getOrElse { + throw new RuntimeException("UiParameter attribute is required.") + } + + private def validateSupportedType(attribute: Attribute): Unit = { + if (UnsupportedUiParameterTypes.contains(attribute.getType)) { + throw new RuntimeException( + s"UiParameter type '${attribute.getType.name()}' is not supported. " + + "Use string, integer, long, double, boolean, or timestamp instead." + ) + } + } + + private def buildInjectedParameterEntry(parameter: UiUDFParameter): PythonTemplateBuilder = { + pyb"${parameter.attribute.getName}: ${parameter.value}" + } + + private def buildInjectedParametersMap( + uiParameters: List[UiUDFParameter] + ): PythonTemplateBuilder = { + val entries = uiParameters.map(buildInjectedParameterEntry) + entries.reduceOption((acc, entry) => acc + pyb", " + entry).getOrElse(pyb"") + } + + private def buildInjectedHookMethod(uiParameters: List[UiUDFParameter]): String = { + val injectedParametersMap = buildInjectedParametersMap(uiParameters) + + (pyb"""|@overrides + |$InjectedUiParametersHookMethodHeader + | return {""" + + injectedParametersMap + + pyb"""} + |""").encode + } + + private def indentBlock(block: String, indent: String): String = { + block + .split("\n", -1) + .map { line => + if (line.nonEmpty) indent + line else line + } + .mkString("\n") + } + + private def lineEndIndex(text: String, from: Int): Int = { + val lineEnd = text.indexOf('\n', from) + if (lineEnd < 0) text.length else lineEnd + } + + private def detectClassBlockEnd(code: String, classHeaderStart: Int, classIndent: String): Int = { + val classLineEnd = lineEndIndex(code, classHeaderStart) + var lineStart = if (classLineEnd < code.length) classLineEnd + 1 else code.length + + while (lineStart < code.length) { + val lineEnd = lineEndIndex(code, lineStart) + val line = code.substring(lineStart, lineEnd) + + val trimmed = line.trim + val isBlank = trimmed.isEmpty + + val currentIndentLen = line.segmentLength(ch => ch == ' ' || ch == '\t') + val classIndentLen = classIndent.length + + if (!isBlank && currentIndentLen <= classIndentLen) { + return lineStart + } + + lineStart = if (lineEnd < code.length) lineEnd + 1 else code.length + } + + code.length + } + + private def containsReservedHook(classBlock: String): Boolean = { + val hookRegex = + ("""(?m)^[ \t]+def\s+""" + Regex.quote(InjectedUiParametersHookMethodName) + """\s*\(""").r + hookRegex.findFirstIn(classBlock).isDefined + } + + private def injectHookIntoUserClass(encodedUserCode: String, hookMethod: String): String = { + val classHeaderMatch = + SupportedPythonUdfClassHeaderRegex.findFirstMatchIn(encodedUserCode).getOrElse { + return encodedUserCode + } + + val classHeaderStart = classHeaderMatch.start + val classIndent = classHeaderMatch.group(1) + val classBlockEnd = detectClassBlockEnd(encodedUserCode, classHeaderStart, classIndent) + + val classBlock = encodedUserCode.substring(classHeaderStart, classBlockEnd) + + if (containsReservedHook(classBlock)) { + throw new RuntimeException( + s"Reserved method '$InjectedUiParametersHookMethodName' is already defined in the UDF class. Please rename your method." + ) + } + + val bodyIndent = inferClassBodyIndent(classBlock, classIndent).getOrElse(classIndent + " ") + val indentedHook = indentBlock( + (if (classBlock.endsWith("\n")) "" else "\n") + hookMethod.trim + "\n", + bodyIndent + ) + + encodedUserCode.substring(0, classBlockEnd) + + indentedHook + + encodedUserCode.substring(classBlockEnd) + } + + private def inferClassBodyIndent(classBlock: String, classIndent: String): Option[String] = { + val lines = classBlock.split("\n", -1).toList.drop(1) + + lines.collectFirst { + case line if line.trim.nonEmpty => + val leading = line.takeWhile(ch => ch == ' ' || ch == '\t') + if (leading.length > classIndent.length) leading else classIndent + " " + } + } + + /** + * Returns Python code with the UI-parameter hook injected into the supported UDF class. + * + * If `uiParameters` is empty, the code is only passed through normal Python-template encoding. Throws + * [[RuntimeException]] when parameter metadata is invalid or the user already defines the reserved hook method. + */ + def inject(code: String, uiParameters: List[UiUDFParameter]): String = { + val parameters = Option(uiParameters).getOrElse(List.empty) + validate(parameters) + + val encodedUserCode = pyb"$code".encode + + if (parameters.isEmpty) { + return encodedUserCode + } + + val hookMethod = buildInjectedHookMethod(parameters) + injectHookIntoUserClass(encodedUserCode, hookMethod) + } +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala new file mode 100644 index 00000000000..b18b9a181d7 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/udf/python/UiUDFParameter.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.udf.python + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.tuple.Attribute +import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString + +import javax.validation.Valid +import javax.validation.constraints.NotNull + +/** + * Serialized operator property for one Python UDF UI parameter. + * + * `attribute` carries the inferred parameter name and type. `value` is user-entered text and is marked as + * [[EncodableString]] so Python code generation decodes it at runtime instead of embedding raw text into generated code. + */ +class UiUDFParameter { + + @JsonProperty(required = true) + @JsonSchemaTitle("Attribute") + @Valid + @NotNull(message = "Attribute is required") + var attribute: Attribute = _ + + @JsonProperty() + @JsonSchemaTitle("Value") + var value: EncodableString = "" +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala new file mode 100644 index 00000000000..3d0fb824075 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/udf/python/PythonUdfUiParameterInjectorSpec.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.texera.amber.operator.udf.python + +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class PythonUdfUiParameterInjectorSpec extends AnyFlatSpec with Matchers { + + private def uiParameter( + attributeName: String, + attributeType: AttributeType, + value: String + ): UiUDFParameter = { + val parameter = new UiUDFParameter + parameter.attribute = new Attribute(attributeName, attributeType) + parameter.value = value + parameter + } + + private def inject(parameters: UiUDFParameter*): String = + PythonUdfUiParameterInjector.inject(baseUdfCode, parameters.toList) + + private def inject(code: String, parameters: UiUDFParameter*): String = + PythonUdfUiParameterInjector.inject(code, parameters.toList) + + private def decoderCallCount(code: String): Int = + code.sliding("self.decode_python_template".length).count(_ == "self.decode_python_template") + + private val baseUdfCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + |""".stripMargin + + it should "return encoded user code unchanged when there are no UI parameters" in { + val injectedCode = inject() + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("""print("open")""") + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should not include ("self.decode_python_template") + injectedCode should not include ("import typing") + } + + it should "inject UI parameter hook into supported UDF class using Dict and Any from pytexera" in { + val injectedCode = inject(uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z")) + + injectedCode should include("class ProcessTupleOperator(UDFOperatorV2):") + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + injectedCode should include("return {") + injectedCode should include("self.decode_python_template") + decoderCallCount(injectedCode) shouldBe 2 + injectedCode should include("""print("open")""") + injectedCode should not include ("import typing") + injectedCode should not include ("typing.Dict") + injectedCode should not include ("typing.Any") + } + + it should "append the reserved hook inside the class before the next top-level statement" in { + val udfCodeWithSiblingDefinition = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def open(self): + | print("open") + | + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int): + | yield tuple_ + | + |def helper(): + | return "outside" + |""".stripMargin + + val injectedCode = + inject(udfCodeWithSiblingDefinition, uiParameter("k", AttributeType.STRING, "v")) + + val hookIndex = injectedCode.indexOf("def _texera_injected_ui_parameters(self)") + val processTupleIndex = + injectedCode.indexOf("def process_tuple(self, tuple_: Tuple, port: int):") + val helperIndex = injectedCode.indexOf("def helper():") + + hookIndex should be >= 0 + processTupleIndex should be < hookIndex + helperIndex should be > hookIndex + } + + it should "preserve multiple UI parameters in the injected map" in { + val injectedCode = inject( + uiParameter("param1", AttributeType.DOUBLE, "12.5"), + uiParameter("param2", AttributeType.INTEGER, "1"), + uiParameter("param3", AttributeType.STRING, "Hola"), + uiParameter("param4", AttributeType.TIMESTAMP, "2026-02-28T03:15:00Z") + ) + + injectedCode should include("def _texera_injected_ui_parameters(self) -> Dict[str, Any]:") + injectedCode should include("self.decode_python_template") + decoderCallCount(injectedCode) shouldBe 8 + injectedCode should not include ("import typing") + } + + it should "throw when a parameter attribute is missing" in { + val invalidParameter = new UiUDFParameter + invalidParameter.attribute = null + invalidParameter.value = "anything" + + val exception = the[RuntimeException] thrownBy { + inject(invalidParameter) + } + + exception.getMessage should include("UiParameter attribute is required") + } + + it should "throw when a UI parameter name is duplicated" in { + val exception = the[RuntimeException] thrownBy { + inject( + uiParameter("date", AttributeType.STRING, "2024-01-01"), + uiParameter("date", AttributeType.TIMESTAMP, "2024-01-01T00:00:00Z") + ) + } + + exception.getMessage should include("UiParameter name 'date' is declared more than once") + } + + Seq(AttributeType.BINARY, AttributeType.LARGE_BINARY).foreach { unsupportedType => + it should s"throw when a UI parameter uses ${unsupportedType.name()} type" in { + val exception = the[RuntimeException] thrownBy { + inject(uiParameter("payload", unsupportedType, "68656c6c6f")) + } + + exception.getMessage should include( + s"UiParameter type '${unsupportedType.name()}' is not supported" + ) + } + } + + it should "throw when the reserved hook is already defined by the user" in { + val udfWithReservedHook = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | def _texera_injected_ui_parameters(self): + | return {} + | + | def open(self): + | pass + |""".stripMargin + + val exception = the[RuntimeException] thrownBy { + inject(udfWithReservedHook, uiParameter("k", AttributeType.STRING, "v")) + } + + exception.getMessage should include( + "Reserved method '_texera_injected_ui_parameters' is already defined" + ) + } + + it should "leave code unchanged when no supported user class is present" in { + val nonSupportedCode = + """from pytexera import * + | + |class SomethingElse: + | def open(self): + | pass + |""".stripMargin + + val injectedCode = inject(nonSupportedCode, uiParameter("k", AttributeType.STRING, "v")) + + injectedCode should not include ("_texera_injected_ui_parameters") + injectedCode should include("class SomethingElse:") + injectedCode should not include ("import typing") + } +}