From 2decd5cdb340f7c08c53e73af513172ab91b572c Mon Sep 17 00:00:00 2001 From: Myracle Date: Wed, 11 Feb 2026 11:16:26 +0800 Subject: [PATCH 1/2] [FLINK-39064][Table SQL / API] Add built-in REGEXP_SPLIT function to split string by regular expression pattern --- .../table/api/internal/BaseExpressions.java | 27 ++++++ .../functions/BuiltInFunctionDefinitions.java | 14 +++ .../functions/RegexpFunctionsITCase.java | 93 ++++++++++++++++++- .../functions/scalar/RegexpSplitFunction.java | 91 ++++++++++++++++++ 4 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 94fbd7c101d49..ea6e0b2b95ad9 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -189,6 +189,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_INSTR; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_SPLIT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_SUBSTR; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPEAT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REPLACE; @@ -1927,6 +1928,32 @@ public OutType split(InType delimiter) { unresolvedCall(SPLIT, toExpr(), objectToExpression(delimiter))); } + /** + * Returns an array of substrings by splitting the input string based on a regular expression + * pattern. + * + *

If the pattern is not found in the string, the original string is returned as the only + * element in the array. If the pattern is empty, every character in the string is split. If the + * string or pattern is null, a null value is returned. If the pattern is found at the beginning + * or end of the string, or there are contiguous matches, then an empty string is added to the + * array. + * + *

Examples: + * + *

{@code
+     * lit("Hello123World456").regexpSplit("[0-9]+") // ["Hello", "World", ""]
+     * lit("a,b;c").regexpSplit("[,;]") // ["a", "b", "c"]
+     * lit("one  two   three").regexpSplit("\\s+") // ["one", "two", "three"]
+     * }
+ * + * @param regex The regular expression pattern to split by. + * @return An array of substrings. + */ + public OutType regexpSplit(InType regex) { + return toApiSpecificExpression( + unresolvedCall(REGEXP_SPLIT, toExpr(), objectToExpression(regex))); + } + /** Returns the keys of the map as an array. */ public OutType mapKeys() { return toApiSpecificExpression(unresolvedCall(MAP_KEYS, toExpr())); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index e8783ba51c859..bf2330d2471fa 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -445,6 +445,20 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) .runtimeClass("org.apache.flink.table.runtime.functions.scalar.SplitFunction") .build(); + public static final BuiltInFunctionDefinition REGEXP_SPLIT = + BuiltInFunctionDefinition.newBuilder() + .name("REGEXP_SPLIT") + .sqlName("REGEXP_SPLIT") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.ARRAY(STRING())))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.RegexpSplitFunction") + .build(); + public static final BuiltInFunctionDefinition URL_DECODE = BuiltInFunctionDefinition.newBuilder() .name("URL_DECODE") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java index c5a63826c7ea4..d8f7733ee2453 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java @@ -37,7 +37,8 @@ Stream getTestSetSpecs() { regexpExtractTestCases(), regexpExtractAllTestCases(), regexpInstrTestCases(), - regexpSubstrTestCases()) + regexpSubstrTestCases(), + regexpSplitTestCases()) .flatMap(s -> s); } @@ -387,4 +388,94 @@ private Stream regexpSubstrTestCases() { "Invalid input arguments. Expected signatures are:\n" + "REGEXP_SUBSTR(str , regex )")); } + + private Stream regexpSplitTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_SPLIT) + .onFieldsWithData( + "Hello123World456", + null, + "a,b;c|d", + "one two three", + 123, + "12345", + ",123,,,123,") + .andDataTypes( + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull(), + DataTypes.INT().notNull(), + DataTypes.STRING().notNull(), + DataTypes.STRING().notNull()) + // Basic regex split + .testResult( + $("f0").regexpSplit("[0-9]+"), + "REGEXP_SPLIT(f0, '[0-9]+')", + new String[] {"Hello", "World", ""}, + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + // null input test + .testResult( + $("f0").regexpSplit(null), + "REGEXP_SPLIT(f0, NULL)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + // Empty regex - split by character + .testResult( + $("f5").regexpSplit(""), + "REGEXP_SPLIT(f5, '')", + new String[] {"1", "2", "3", "4", "5"}, + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + // null string input + .testResult( + $("f1").regexpSplit("[0-9]+"), + "REGEXP_SPLIT(f1, '[0-9]+')", + null, + DataTypes.ARRAY(DataTypes.STRING())) + // null string and null pattern + .testResult( + $("f1").regexpSplit(null), + "REGEXP_SPLIT(f1, null)", + null, + DataTypes.ARRAY(DataTypes.STRING())) + // Multi-character delimiter regex + .testResult( + $("f2").regexpSplit("[,;|]"), + "REGEXP_SPLIT(f2, '[,;|]')", + new String[] {"a", "b", "c", "d"}, + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + // Whitespace regex + .testResult( + $("f3").regexpSplit("\\s+"), + "REGEXP_SPLIT(f3, '\\\\s+')", + new String[] {"one", "two", "three"}, + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + // No match - return original string + .testResult( + $("f5").regexpSplit("[a-z]+"), + "REGEXP_SPLIT(f5, '[a-z]+')", + new String[] {"12345"}, + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + // Invalid regex - return null + .testResult( + $("f0").regexpSplit("("), + "REGEXP_SPLIT(f0, '(')", + null, + DataTypes.ARRAY(DataTypes.STRING()).notNull()) + // Validation error for non-string type input + .testTableApiValidationError( + $("f4").regexpSplit("[0-9]+"), + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_SPLIT(, )") + .testSqlValidationError( + "REGEXP_SPLIT(f4, '[0-9]+')", + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_SPLIT(, )") + .testSqlValidationError( + "REGEXP_SPLIT()", + "No match found for function signature REGEXP_SPLIT()") + .testSqlValidationError( + "REGEXP_SPLIT(f1, '1', '2')", + "No match found for function signature REGEXP_SPLIT(, , )")); + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java new file mode 100644 index 0000000000000..3054fa719d137 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#REGEXP_SPLIT}. + * + *

Splits a string by a regular expression pattern and returns an array of substrings. + * + *

Examples: + * + *

{@code
+ * REGEXP_SPLIT('Hello123World456', '[0-9]+') = ['Hello', 'World', '']
+ * REGEXP_SPLIT('a,b;c', '[,;]') = ['a', 'b', 'c']
+ * REGEXP_SPLIT('one  two   three', '\\s+') = ['one', 'two', 'three']
+ * }
+ */ +@Internal +public class RegexpSplitFunction extends BuiltInScalarFunction { + + private transient Pattern cachedPattern; + private transient String cachedRegex; + + public RegexpSplitFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.REGEXP_SPLIT, context); + } + + public @Nullable ArrayData eval(@Nullable StringData str, @Nullable StringData regex) { + if (str == null || regex == null) { + return null; + } + + String regexStr = regex.toString(); + if (regexStr.isEmpty()) { + // If regex is empty, split by each character + String strValue = str.toString(); + StringData[] result = new StringData[strValue.length()]; + for (int i = 0; i < strValue.length(); i++) { + result[i] = StringData.fromString(String.valueOf(strValue.charAt(i))); + } + return new GenericArrayData(result); + } + + try { + // Cache the compiled pattern to improve performance + if (cachedPattern == null || !regexStr.equals(cachedRegex)) { + cachedPattern = Pattern.compile(regexStr); + cachedRegex = regexStr; + } + + // Use -1 as limit to keep all trailing empty strings + String[] splitResult = cachedPattern.split(str.toString(), -1); + StringData[] result = new StringData[splitResult.length]; + for (int i = 0; i < splitResult.length; i++) { + result[i] = StringData.fromString(splitResult[i]); + } + return new GenericArrayData(result); + } catch (PatternSyntaxException e) { + // Return null for invalid regex pattern (consistent with other REGEXP_* functions) + return null; + } + } +} From d84cb22cfaaf7d0db898605ffb245a03743064a1 Mon Sep 17 00:00:00 2001 From: Myracle Date: Thu, 12 Feb 2026 15:13:26 +0800 Subject: [PATCH 2/2] hotfix --- docs/data/sql_functions.yml | 36 +++++++++++++++++++ docs/data/sql_functions_zh.yml | 10 ++++++ .../reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 12 +++++++ .../functions/RegexpFunctionsITCase.java | 10 +++--- .../runtime/functions/SqlFunctionUtils.java | 18 ++++++++++ .../functions/scalar/RegexpSplitFunction.java | 31 +++++++--------- 7 files changed, 94 insertions(+), 24 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 0ee07f21c5774..b4eff5b4ec3a4 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -409,7 +409,43 @@ string: `str , regex ` + Returns an `STRING` representation of the first matched substring. `NULL` if any of the arguments are `NULL` or regex if invalid or pattern is not found. + - sql: REGEXP_SPLIT(str, regex) + table: str.regexpSplit(regex) + description: | + Splits str by the regular expression regex and returns an array of strings. + + E.g., REGEXP_SPLIT('Hello123World456', '[0-9]+') returns ['Hello', 'World', '']. + + `str , regex ` + + Returns an `ARRAY` of split substrings. `NULL` if any of the arguments are `NULL` or regex is invalid. +>>>>>>> 8d75684590d (hotfix) +======= Returns an `STRING` representation of the first matched substring. `NULL` if any of the arguments are `NULL` or regex is invalid or pattern is not found. + - sql: REGEXP_SPLIT(str, regex) + table: str.regexpSplit(regex) + description: | + Splits str by the regular expression regex and returns an array of strings. + + E.g., REGEXP_SPLIT('Hello123World456', '[0-9]+') returns ['Hello', 'World', '']. + + `str , regex ` + + Returns an `ARRAY` of split substrings. `NULL` if any of the arguments are `NULL` or regex is invalid. +======= + Returns an `STRING` representation of the first matched substring. `NULL` if any of the arguments are `NULL` or regex if invalid or pattern is not found. + - sql: REGEXP_SPLIT(str, regex) + table: str.regexpSplit(regex) + description: | + Splits str by the regular expression regex and returns an array of strings. + + E.g., REGEXP_SPLIT('Hello123World456', '[0-9]+') returns ['Hello', 'World', '']. + + `str , regex ` + + Returns an `ARRAY` of split substrings. `NULL` if any of the arguments are `NULL` or regex is invalid. +>>>>>>> 8d75684590d (hotfix) - sql: TRANSLATE(expr, fromStr, toStr) table: expr.translate(fromStr, toStr) description: | diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index dfd35ad0e3d74..b934a3589e871 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -477,6 +477,16 @@ string: `str , regex ` 返回一个 `STRING` 表示 str 中第一个匹配 regex 的子字符串。如果任何参数为 `NULL` 或 regex 非法或匹配失败,则返回 `NULL`。 + - sql: REGEXP_SPLIT(str, regex) + table: str.regexpSplit(regex) + description: | + 按正则表达式 regex 拆分字符串 str,并返回一个字符串数组。 + + 例如 `REGEXP_SPLIT('Hello123World456', '[0-9]+')` 返回 `['Hello', 'World', '']`。 + + `str , regex ` + + 返回一个 `ARRAY` 表示拆分后的子字符串数组。如果任何参数为 `NULL` 或 regex 非法,则返回 `NULL`。 - sql: TRANSLATE(expr, fromStr, toStr) table: expr.translate(fromStr, toStr) description: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index e2aeb34f41107..233ebb7c073fb 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -189,6 +189,7 @@ string functions Expression.regexp_extract_all Expression.regexp_instr Expression.regexp_substr + Expression.regexp_split Expression.from_base64 Expression.to_base64 Expression.ascii diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 04649a6472757..b2a8357cdd411 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1362,6 +1362,18 @@ def regexp_substr(self, regex) -> 'Expression': """ return _binary_op("regexpSubstr")(self, regex) + def regexp_split(self, regex) -> 'Expression': + """ + Splits the string by the regular expression regex and returns an array of strings. + null if any of the arguments are null or regex is invalid. + + E.g., regexp_split('Hello123World456', '[0-9]+') returns ['Hello', 'World', '']. + + :param regex: A STRING expression with a matching pattern. + :return: An ARRAY of split substrings. + """ + return _binary_op("regexpSplit")(self, regex) + @property def from_base64(self) -> 'Expression[str]': """ diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java index d8f7733ee2453..744b81ad7218f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java @@ -407,7 +407,7 @@ private Stream regexpSplitTestCases() { DataTypes.STRING().notNull(), DataTypes.INT().notNull(), DataTypes.STRING().notNull(), - DataTypes.STRING().notNull()) + DataTypes.STRING()) // Basic regex split .testResult( $("f0").regexpSplit("[0-9]+"), @@ -447,7 +447,7 @@ private Stream regexpSplitTestCases() { // Whitespace regex .testResult( $("f3").regexpSplit("\\s+"), - "REGEXP_SPLIT(f3, '\\\\s+')", + "REGEXP_SPLIT(f3, '\\s+')", new String[] {"one", "two", "three"}, DataTypes.ARRAY(DataTypes.STRING()).notNull()) // No match - return original string @@ -458,10 +458,10 @@ private Stream regexpSplitTestCases() { DataTypes.ARRAY(DataTypes.STRING()).notNull()) // Invalid regex - return null .testResult( - $("f0").regexpSplit("("), - "REGEXP_SPLIT(f0, '(')", + $("f6").regexpSplit("("), + "REGEXP_SPLIT(f6, '(')", null, - DataTypes.ARRAY(DataTypes.STRING()).notNull()) + DataTypes.ARRAY(DataTypes.STRING())) // Validation error for non-string type input .testTableApiValidationError( $("f4").regexpSplit("[0-9]+"), diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java index 0fdd52136135e..f546c6812a8d7 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java @@ -491,6 +491,24 @@ public static Matcher getRegexpMatcher(@Nullable StringData str, @Nullable Strin } } + /** + * Returns a compiled Pattern object for the given regular expression string, using a shared + * cache for performance optimization. + * + * @param regex the regular expression pattern string + * @return the compiled Pattern, or null if regex is null or invalid + */ + public static @Nullable Pattern getRegexpPattern(@Nullable String regex) { + if (regex == null) { + return null; + } + try { + return REGEXP_PATTERN_CACHE.get(regex); + } catch (PatternSyntaxException e) { + return null; + } + } + /** * Parse string as key-value string and return the value matches key name. example: * keyvalue('k1=v1;k2=v2', ';', '=', 'k2') = 'v2' keyvalue('k1:v1,k2:v2', ',', ':', 'k3') = NULL diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java index 3054fa719d137..b04f6f6ba0aac 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpSplitFunction.java @@ -28,7 +28,8 @@ import javax.annotation.Nullable; import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.getRegexpPattern; /** * Implementation of {@link BuiltInFunctionDefinitions#REGEXP_SPLIT}. @@ -46,9 +47,6 @@ @Internal public class RegexpSplitFunction extends BuiltInScalarFunction { - private transient Pattern cachedPattern; - private transient String cachedRegex; - public RegexpSplitFunction(SpecializedFunction.SpecializedContext context) { super(BuiltInFunctionDefinitions.REGEXP_SPLIT, context); } @@ -69,23 +67,18 @@ public RegexpSplitFunction(SpecializedFunction.SpecializedContext context) { return new GenericArrayData(result); } - try { - // Cache the compiled pattern to improve performance - if (cachedPattern == null || !regexStr.equals(cachedRegex)) { - cachedPattern = Pattern.compile(regexStr); - cachedRegex = regexStr; - } - - // Use -1 as limit to keep all trailing empty strings - String[] splitResult = cachedPattern.split(str.toString(), -1); - StringData[] result = new StringData[splitResult.length]; - for (int i = 0; i < splitResult.length; i++) { - result[i] = StringData.fromString(splitResult[i]); - } - return new GenericArrayData(result); - } catch (PatternSyntaxException e) { + Pattern pattern = getRegexpPattern(regexStr); + if (pattern == null) { // Return null for invalid regex pattern (consistent with other REGEXP_* functions) return null; } + + // Use -1 as limit to keep all trailing empty strings + String[] splitResult = pattern.split(str.toString(), -1); + StringData[] result = new StringData[splitResult.length]; + for (int i = 0; i < splitResult.length; i++) { + result[i] = StringData.fromString(splitResult[i]); + } + return new GenericArrayData(result); } }