diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBM4TableFunctionIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBM4TableFunctionIT.java new file mode 100644 index 000000000000..750eeb5b0064 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBM4TableFunctionIT.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.db.it; + +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; +import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail; +import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBM4TableFunctionIT { + + private static final String DATABASE_NAME = "test"; + + private static final String[] SQLS = + new String[] { + "CREATE DATABASE " + DATABASE_NAME, + "USE " + DATABASE_NAME, + "CREATE TABLE vehicle(device_id STRING TAG, speed DOUBLE FIELD, status STRING FIELD)", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.001+08:00, 'car_01', 5.0, 'OK')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.003+08:00, 'car_01', 15.0, 'OK')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.006+08:00, 'car_01', 30.0, 'WARN')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.009+08:00, 'car_01', 10.0, 'OK')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.020+08:00, 'car_01', 40.0, 'CRIT')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.002+08:00, 'car_02', 8.0, 'OK')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.005+08:00, 'car_02', 25.0, 'WARN')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.008+08:00, 'car_02', 12.0, 'OK')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.011+08:00, 'car_02', 18.0, 'OK')", + "INSERT INTO vehicle(time, device_id, speed, status) VALUES (1970-01-01T08:00:00.015+08:00, 'car_02', 6.0, 'WARN')", + "FLUSH" + }; + + @BeforeClass + public static void setUp() { + EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockLineNumber(2); + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(SQLS); + } + + @AfterClass + public static void tearDown() { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testTimeWindowMode() { + String[] expectedHeader = + new String[] {"window_start", "window_end", "m4_time", "m4_value", "device_id", "status"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.001Z,5.0,car_01,OK,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.006Z,30.0,car_01,WARN,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.009Z,10.0,car_01,OK,", + "1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.030Z,1970-01-01T00:00:00.020Z,40.0,car_01,CRIT,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.002Z,8.0,car_02,OK,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.005Z,25.0,car_02,WARN,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.008Z,12.0,car_02,OK,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.011Z,18.0,car_02,OK,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.015Z,6.0,car_02,WARN," + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, m4_time, m4_value, device_id, status " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 10ms)) " + + "ORDER BY device_id, window_start, m4_time", + expectedHeader, + retArray, + DATABASE_NAME); + + expectedHeader = + new String[] { + "window_start", + "window_end", + "m4_time", + "m4_value", + "time", + "device_id", + "speed", + "status" + }; + tableResultSetEqualTest( + "SELECT * " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 10ms)) " + + "ORDER BY device_id, window_start, m4_time", + expectedHeader, + new String[] { + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.001Z,5.0,1970-01-01T00:00:00.001Z,car_01,5.0,OK,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.006Z,30.0,1970-01-01T00:00:00.006Z,car_01,30.0,WARN,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.009Z,10.0,1970-01-01T00:00:00.009Z,car_01,10.0,OK,", + "1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.030Z,1970-01-01T00:00:00.020Z,40.0,1970-01-01T00:00:00.020Z,car_01,40.0,CRIT,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.002Z,8.0,1970-01-01T00:00:00.002Z,car_02,8.0,OK,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.005Z,25.0,1970-01-01T00:00:00.005Z,car_02,25.0,WARN,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.008Z,12.0,1970-01-01T00:00:00.008Z,car_02,12.0,OK,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.011Z,18.0,1970-01-01T00:00:00.011Z,car_02,18.0,OK,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.015Z,6.0,1970-01-01T00:00:00.015Z,car_02,6.0,WARN," + }, + DATABASE_NAME); + } + + @Test + public void testTimeWindowModeByPosition() { + String[] expectedHeader = + new String[] {"window_start", "window_end", "m4_time", "m4_value", "device_id", "status"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.001Z,5.0,car_01,OK,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.006Z,30.0,car_01,WARN,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.009Z,10.0,car_01,OK,", + "1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.030Z,1970-01-01T00:00:00.020Z,40.0,car_01,CRIT,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.002Z,8.0,car_02,OK,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.005Z,25.0,car_02,WARN,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.008Z,12.0,car_02,OK,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.011Z,18.0,car_02,OK,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.015Z,6.0,car_02,WARN," + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, m4_time, m4_value, device_id, status " + + "FROM TABLE(M4(TABLE(vehicle) PARTITION BY device_id ORDER BY time, 'time', 'speed', 10ms)) " + + "ORDER BY device_id, window_start, m4_time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testGroupBy() { + String[] expectedHeader = new String[] {"device_id", "point_count"}; + String[] retArray = new String[] {"car_01,4,", "car_02,5,"}; + tableResultSetEqualTest( + "SELECT device_id, COUNT(*) AS point_count " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 10ms)) " + + "GROUP BY device_id ORDER BY device_id", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testTimeWindowWithGap() { + String[] expectedHeader = + new String[] {"window_start", "window_end", "m4_time", "m4_value", "device_id"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.005Z,1970-01-01T00:00:00.001Z,5.0,car_01,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.005Z,1970-01-01T00:00:00.003Z,15.0,car_01,", + "1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.025Z,1970-01-01T00:00:00.020Z,40.0,car_01,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.005Z,1970-01-01T00:00:00.002Z,8.0,car_02,", + "1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.015Z,1970-01-01T00:00:00.011Z,18.0,car_02," + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 5ms, SLIDE => 10ms)) " + + "ORDER BY device_id, window_start, m4_time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testDisplayWindowRange() { + String[] expectedHeader = + new String[] {"window_start", "window_end", "m4_time", "m4_value", "device_id"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.001Z,5.0,car_01,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.006Z,30.0,car_01,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.009Z,10.0,car_01,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.002Z,8.0,car_02,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.005Z,25.0,car_02,", + "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.008Z,12.0,car_02," + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 10ms, " + + "DISPLAYBEGIN => 1970-01-01T08:00:00.000+08:00, " + + "DISPLAYEND => 1970-01-01T08:00:00.010+08:00)) " + + "ORDER BY device_id, window_start, m4_time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testCountWindowMode() { + String[] expectedHeader = + new String[] {"window_start", "window_end", "m4_time", "m4_value", "device_id"}; + String[] retArray = + new String[] { + "1970-01-01T00:00:00.001Z,1970-01-01T00:00:00.004Z,1970-01-01T00:00:00.001Z,5.0,car_01,", + "1970-01-01T00:00:00.001Z,1970-01-01T00:00:00.004Z,1970-01-01T00:00:00.003Z,15.0,car_01,", + "1970-01-01T00:00:00.006Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.006Z,30.0,car_01,", + "1970-01-01T00:00:00.006Z,1970-01-01T00:00:00.010Z,1970-01-01T00:00:00.009Z,10.0,car_01,", + "1970-01-01T00:00:00.020Z,1970-01-01T00:00:00.021Z,1970-01-01T00:00:00.020Z,40.0,car_01,", + "1970-01-01T00:00:00.002Z,1970-01-01T00:00:00.006Z,1970-01-01T00:00:00.002Z,8.0,car_02,", + "1970-01-01T00:00:00.002Z,1970-01-01T00:00:00.006Z,1970-01-01T00:00:00.005Z,25.0,car_02,", + "1970-01-01T00:00:00.008Z,1970-01-01T00:00:00.012Z,1970-01-01T00:00:00.008Z,12.0,car_02,", + "1970-01-01T00:00:00.008Z,1970-01-01T00:00:00.012Z,1970-01-01T00:00:00.011Z,18.0,car_02,", + "1970-01-01T00:00:00.015Z,1970-01-01T00:00:00.016Z,1970-01-01T00:00:00.015Z,6.0,car_02," + }; + tableResultSetEqualTest( + "SELECT window_start, window_end, m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 2)) " + + "ORDER BY device_id, window_start, m4_time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testIllegalValueType() { + tableAssertTestFail( + "SELECT m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'status', SIZE => 10ms))", + "701: The type of the column [status] is not as expected.", + DATABASE_NAME); + } + + @Test + public void testValueColumnNotFound() { + tableAssertTestFail( + "SELECT m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'fake_speed', SIZE => 10ms))", + "701: Required column [fake_speed] not found in the source table argument.", + DATABASE_NAME); + } + + @Test + public void testMissingSize() { + tableAssertTestFail( + "SELECT m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id ORDER BY time, " + + "TIMECOL => 'time', VALUECOL => 'speed'))", + "701: Missing required argument: SIZE", + DATABASE_NAME); + } + + @Test + public void testMissingOrderBy() { + tableAssertTestFail( + "SELECT m4_time, m4_value, device_id " + + "FROM TABLE(M4(DATA => TABLE(vehicle) PARTITION BY device_id, " + + "TIMECOL => 'time', VALUECOL => 'speed', SIZE => 10ms))", + "701: Table argument with set semantics requires an ORDER BY clause.", + DATABASE_NAME); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index 6d3326dd4026..b9b1026a4a82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -119,6 +119,7 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.M4TableFunction; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.MPPQueryContext.ExplainType; @@ -5022,9 +5023,69 @@ private ArgumentsAnalysis analyzeArguments( analyzeDefault(parameterSpecification, errorLocation)); } } + tryAppendM4ModeArgument(functionName, arguments, parameterSpecifications, passedArguments); return new ArgumentsAnalysis(passedArguments.buildOrThrow(), tableArgumentAnalyses.build()); } + private void tryAppendM4ModeArgument( + String functionName, + List arguments, + List parameterSpecifications, + ImmutableMap.Builder passedArguments) { + if (!TableBuiltinTableFunction.M4.getFunctionName().equalsIgnoreCase(functionName)) { + return; + } + + TableFunctionArgument sizeArgument = + findTableFunctionArgument( + arguments, parameterSpecifications, M4TableFunction.SIZE_PARAMETER_NAME); + if (!(sizeArgument.getValue() instanceof Expression)) { + throw new SemanticException( + String.format( + "Invalid argument %s. Expected scalar argument, got table", + M4TableFunction.SIZE_PARAMETER_NAME)); + } + + boolean isTimeWindow = sizeArgument.getValue() instanceof TimeDurationLiteral; + passedArguments.put( + M4TableFunction.WINDOW_MODE_PARAMETER_NAME, + new ScalarArgument(org.apache.iotdb.udf.api.type.Type.BOOLEAN, isTimeWindow)); + } + + private TableFunctionArgument findTableFunctionArgument( + List arguments, + List parameterSpecifications, + String argumentName) { + if (arguments.isEmpty()) { + throw new IllegalStateException("Arguments should never be empty when resolving M4 mode"); + } + + boolean argumentsPassedByName = + arguments.stream().allMatch(argument -> argument.getName().isPresent()); + if (argumentsPassedByName) { + return arguments.stream() + .filter( + argument -> + argumentName.equalsIgnoreCase(argument.getName().get().getCanonicalValue())) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + String.format("Missing required argument: %s", argumentName))); + } + + for (int i = 0, size = parameterSpecifications.size(); i < size; i++) { + if (argumentName.equalsIgnoreCase(parameterSpecifications.get(i).getName())) { + if (i >= arguments.size()) { + throw new IllegalStateException( + String.format("Missing required argument: %s", argumentName)); + } + return arguments.get(i); + } + } + throw new IllegalStateException(String.format("Unknown argument: %s", argumentName)); + } + // append order by time asc for built-in forecast tvf if user doesn't specify order by clause private void tryUpdateOrderByForForecastByName( String functionName, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java index 9ead3d187ac8..d519d237118c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java @@ -46,6 +46,7 @@ import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AnalyzerTest.analyzeSQL; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.QUERY_CONTEXT; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.TEST_MATADATA; +import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestUtils.assertAnalyzeSemanticException; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregationFunction; @@ -225,6 +226,85 @@ public void testSimpleSetSemantic() { assertPlan(planTester.getFragmentPlan(5), tableScan); } + @Test + public void testM4TimeWindowMode() { + PlanTester planTester = new PlanTester(); + String sql = + "SELECT * FROM TABLE(M4(" + + "DATA => TABLE(table1) PARTITION BY tag1 ORDER BY time, " + + "TIMECOL => 'time', " + + "VALUECOL => 's1', " + + "SIZE => 1h))"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = + tableScan( + "testdb.table1", + ImmutableList.of("time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3"), + ImmutableSet.of("time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3")); + + Consumer tableFunctionMatcher = + builder -> + builder + .name("m4") + .properOutputs("window_start", "window_end", "m4_time", "m4_value") + .requiredSymbols("time", "s1") + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("SIZE", 3600000L) + .addProperty("SLIDE", 3600000L) + .addProperty("DISPLAYBEGIN", Long.MIN_VALUE) + .addProperty("DISPLAYEND", Long.MAX_VALUE) + .addProperty("__M4_WINDOW_MODE", true) + .addProperty("__M4_VALUE_TYPE", "INT64") + .build()); + + assertPlan( + logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, sort(tableScan)))); + } + + @Test + public void testM4CountWindowMode() { + PlanTester planTester = new PlanTester(); + String sql = + "SELECT * FROM TABLE(M4(" + + "DATA => TABLE(table1) PARTITION BY tag1 ORDER BY time, " + + "TIMECOL => 'time', " + + "VALUECOL => 's1', " + + "SIZE => 5))"; + LogicalQueryPlan logicalQueryPlan = planTester.createPlan(sql); + PlanMatchPattern tableScan = + tableScan( + "testdb.table1", + ImmutableList.of("time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3"), + ImmutableSet.of("time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3")); + + Consumer tableFunctionMatcher = + builder -> + builder + .name("m4") + .properOutputs("window_start", "window_end", "m4_time", "m4_value") + .requiredSymbols("time", "s1") + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("SIZE", 5L) + .addProperty("SLIDE", 5L) + .addProperty("DISPLAYBEGIN", Long.MIN_VALUE) + .addProperty("DISPLAYEND", Long.MAX_VALUE) + .addProperty("__M4_WINDOW_MODE", false) + .addProperty("__M4_VALUE_TYPE", "INT64") + .build()); + + assertPlan( + logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, sort(tableScan)))); + } + + @Test + public void testM4MissingOrderBy() { + assertAnalyzeSemanticException( + "SELECT * FROM TABLE(M4(DATA => TABLE(table1) PARTITION BY tag1, TIMECOL => 'time', VALUECOL => 's1', SIZE => 5))", + "Table argument with set semantics requires an ORDER BY clause."); + } + @Test public void testLeafFunction() { PlanTester planTester = new PlanTester(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java index 577f02aabd71..86e6eb8418be 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/queryengine/plan/relational/function/TableBuiltinTableFunction.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.udf.builtin.relational.tvf.CapacityTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.CumulateTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.M4TableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.TumbleTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.VariationTableFunction; @@ -42,6 +43,7 @@ public enum TableBuiltinTableFunction { SESSION("session"), VARIATION("variation"), CAPACITY("capacity"), + M4("m4"), FORECAST("forecast"), PATTERN_MATCH("pattern_match"), CLASSIFY("classify"); @@ -86,6 +88,8 @@ public static TableFunction getBuiltinTableFunction(String functionName) { return new PatternMatchTableFunction(); case "capacity": return new CapacityTableFunction(); + case "m4": + return new M4TableFunction(); case "forecast": return new ForecastTableFunction(); case "classify": diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java new file mode 100644 index 000000000000..d5b3a9a83396 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin.relational.tvf; + +import org.apache.iotdb.commons.exception.SemanticException; +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; +import org.apache.iotdb.udf.api.relational.table.argument.Argument; +import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; +import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; +import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; +import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; +import org.apache.iotdb.udf.api.type.Type; + +import com.google.common.collect.ImmutableSet; +import org.apache.tsfile.block.column.ColumnBuilder; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; +import static org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER; + +public class M4TableFunction implements TableFunction { + + public static final String DATA_PARAMETER_NAME = "DATA"; + public static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; + public static final String VALUECOL_PARAMETER_NAME = "VALUECOL"; + public static final String SIZE_PARAMETER_NAME = "SIZE"; + public static final String SLIDE_PARAMETER_NAME = "SLIDE"; + public static final String DISPLAYBEGIN_PARAMETER_NAME = "DISPLAYBEGIN"; + public static final String DISPLAYEND_PARAMETER_NAME = "DISPLAYEND"; + public static final String WINDOW_MODE_PARAMETER_NAME = "__M4_WINDOW_MODE"; + + private static final String OUTPUT_WINDOW_START_COLUMN = "window_start"; + private static final String OUTPUT_WINDOW_END_COLUMN = "window_end"; + private static final String OUTPUT_TIME_COLUMN = "m4_time"; + private static final String OUTPUT_VALUE_COLUMN = "m4_value"; + private static final String VALUE_TYPE_PROPERTY = "__M4_VALUE_TYPE"; + private static final long UNSPECIFIED_SLIDE = Long.MIN_VALUE; + private static final long UNSPECIFIED_DISPLAY_BEGIN = Long.MIN_VALUE; + private static final long INVALID_INDEX = -1; + + @Override + public List getArgumentsSpecifications() { + return Arrays.asList( + TableParameterSpecification.builder() + .name(DATA_PARAMETER_NAME) + .setSemantics() + .passThroughColumns() + .build(), + ScalarParameterSpecification.builder() + .name(TIMECOL_PARAMETER_NAME) + .type(Type.STRING) + .build(), + ScalarParameterSpecification.builder() + .name(VALUECOL_PARAMETER_NAME) + .type(Type.STRING) + .build(), + ScalarParameterSpecification.builder() + .name(SIZE_PARAMETER_NAME) + .type(Type.INT64) + .addChecker(POSITIVE_LONG_CHECKER) + .build(), + ScalarParameterSpecification.builder() + .name(SLIDE_PARAMETER_NAME) + .type(Type.INT64) + .defaultValue(UNSPECIFIED_SLIDE) + .build(), + ScalarParameterSpecification.builder() + .name(DISPLAYBEGIN_PARAMETER_NAME) + .type(Type.TIMESTAMP) + .defaultValue(UNSPECIFIED_DISPLAY_BEGIN) + .build(), + ScalarParameterSpecification.builder() + .name(DISPLAYEND_PARAMETER_NAME) + .type(Type.TIMESTAMP) + .defaultValue(Long.MAX_VALUE) + .build()); + } + + @Override + public TableFunctionAnalysis analyze(Map arguments) throws UDFException { + TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); + if (tableArgument.getOrderBy().isEmpty()) { + throw new SemanticException("Table argument with set semantics requires an ORDER BY clause."); + } + + String timeColumn = + (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); + String valueColumn = + (String) ((ScalarArgument) arguments.get(VALUECOL_PARAMETER_NAME)).getValue(); + int timeColumnIndex = + findColumnIndex(tableArgument, timeColumn, Collections.singleton(Type.TIMESTAMP)); + int valueColumnIndex = + findColumnIndex(tableArgument, valueColumn, ImmutableSet.copyOf(Type.numericTypes())); + + long size = (long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(); + long slide = (long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(); + if (slide == UNSPECIFIED_SLIDE) { + slide = size; + } else if (slide <= 0) { + throw new UDFException("Invalid scalar argument SLIDE, should be a positive value"); + } + + Type valueType = tableArgument.getFieldTypes().get(valueColumnIndex); + boolean isTimeWindow = + arguments.containsKey(WINDOW_MODE_PARAMETER_NAME) + && (boolean) ((ScalarArgument) arguments.get(WINDOW_MODE_PARAMETER_NAME)).getValue(); + + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty(WINDOW_MODE_PARAMETER_NAME, isTimeWindow) + .addProperty(SIZE_PARAMETER_NAME, size) + .addProperty(SLIDE_PARAMETER_NAME, slide) + .addProperty( + DISPLAYBEGIN_PARAMETER_NAME, + ((ScalarArgument) arguments.get(DISPLAYBEGIN_PARAMETER_NAME)).getValue()) + .addProperty( + DISPLAYEND_PARAMETER_NAME, + ((ScalarArgument) arguments.get(DISPLAYEND_PARAMETER_NAME)).getValue()) + .addProperty(VALUE_TYPE_PROPERTY, valueType.name()) + .build(); + + return TableFunctionAnalysis.builder() + .properColumnSchema( + new DescribedSchema.Builder() + .addField(OUTPUT_WINDOW_START_COLUMN, Type.TIMESTAMP) + .addField(OUTPUT_WINDOW_END_COLUMN, Type.TIMESTAMP) + .addField(OUTPUT_TIME_COLUMN, Type.TIMESTAMP) + .addField(OUTPUT_VALUE_COLUMN, valueType) + .build()) + .requireRecordSnapshot(false) + .requiredColumns(DATA_PARAMETER_NAME, Arrays.asList(timeColumnIndex, valueColumnIndex)) + .handle(handle) + .build(); + } + + @Override + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + MapTableFunctionHandle handle = (MapTableFunctionHandle) tableFunctionHandle; + boolean isTimeWindow = (boolean) handle.getProperty(WINDOW_MODE_PARAMETER_NAME); + long size = (long) handle.getProperty(SIZE_PARAMETER_NAME); + long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME); + long displayBegin = (long) handle.getProperty(DISPLAYBEGIN_PARAMETER_NAME); + long displayEnd = (long) handle.getProperty(DISPLAYEND_PARAMETER_NAME); + ValueOperator valueOperator = + ValueOperator.fromType(Type.valueOf((String) handle.getProperty(VALUE_TYPE_PROPERTY))); + + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionDataProcessor getDataProcessor() { + return isTimeWindow + ? new TimeWindowM4DataProcessor(valueOperator, size, slide, displayBegin, displayEnd) + : new CountWindowM4DataProcessor(valueOperator, size, slide, displayBegin, displayEnd); + } + }; + } + + private enum ValueOperator { + INT32(Type.INT32) { + @Override + Object read(Record record) { + return record.getInt(1); + } + + @Override + int compare(Object left, Object right) { + return Integer.compare((Integer) left, (Integer) right); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeInt((Integer) value); + } + }, + INT64(Type.INT64) { + @Override + Object read(Record record) { + return record.getLong(1); + } + + @Override + int compare(Object left, Object right) { + return Long.compare((Long) left, (Long) right); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeLong((Long) value); + } + }, + FLOAT(Type.FLOAT) { + @Override + Object read(Record record) { + return record.getFloat(1); + } + + @Override + int compare(Object left, Object right) { + return Float.compare((Float) left, (Float) right); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeFloat((Float) value); + } + }, + DOUBLE(Type.DOUBLE) { + @Override + Object read(Record record) { + return record.getDouble(1); + } + + @Override + int compare(Object left, Object right) { + return Double.compare((Double) left, (Double) right); + } + + @Override + void write(ColumnBuilder builder, Object value) { + builder.writeDouble((Double) value); + } + }; + + private final Type type; + + ValueOperator(Type type) { + this.type = type; + } + + abstract Object read(Record record); + + abstract int compare(Object left, Object right); + + abstract void write(ColumnBuilder builder, Object value); + + static ValueOperator fromType(Type type) { + for (ValueOperator valueOperator : values()) { + if (valueOperator.type == type) { + return valueOperator; + } + } + throw new IllegalArgumentException("Unsupported M4 value type: " + type); + } + } + + private static class Candidate { + private long index = INVALID_INDEX; + private long time; + private Object value; + + private void set(long index, long time, Object value) { + this.index = index; + this.time = time; + this.value = value; + } + } + + private abstract static class WindowState { + protected final Candidate first = new Candidate(); + protected final Candidate last = new Candidate(); + protected final Candidate bottom = new Candidate(); + protected final Candidate top = new Candidate(); + + private void update(long rowIndex, long time, Object value, ValueOperator valueOperator) { + if (first.index == INVALID_INDEX) { + first.set(rowIndex, time, value); + last.set(rowIndex, time, value); + bottom.set(rowIndex, time, value); + top.set(rowIndex, time, value); + return; + } + + last.set(rowIndex, time, value); + if (valueOperator.compare(value, bottom.value) < 0) { + bottom.set(rowIndex, time, value); + } + if (valueOperator.compare(value, top.value) > 0) { + top.set(rowIndex, time, value); + } + } + + private boolean hasOutput() { + return first.index != INVALID_INDEX; + } + + protected abstract long getOutputWindowStart(); + + protected abstract long getOutputWindowEnd(); + } + + private static class TimeWindowState extends WindowState { + private final long windowStart; + private final long endExclusive; + + private TimeWindowState(long windowStart, long endExclusive) { + this.windowStart = windowStart; + this.endExclusive = endExclusive; + } + + @Override + protected long getOutputWindowStart() { + return windowStart; + } + + @Override + protected long getOutputWindowEnd() { + return endExclusive; + } + } + + private static class CountWindowState extends WindowState { + private final long endExclusive; + private long windowStart = Long.MIN_VALUE; + private long windowEnd = Long.MIN_VALUE; + + private CountWindowState(long endExclusive) { + this.endExclusive = endExclusive; + } + + @Override + protected long getOutputWindowStart() { + return windowStart; + } + + @Override + protected long getOutputWindowEnd() { + return windowEnd; + } + } + + private abstract static class AbstractM4DataProcessor implements TableFunctionDataProcessor { + protected final ValueOperator valueOperator; + protected final long size; + protected final long slide; + protected final long displayBegin; + protected final long displayEnd; + + protected long curIndex = 0; + protected boolean reachedDisplayEnd = false; + + protected AbstractM4DataProcessor( + ValueOperator valueOperator, long size, long slide, long displayBegin, long displayEnd) { + this.valueOperator = valueOperator; + this.size = size; + this.slide = slide; + this.displayBegin = displayBegin; + this.displayEnd = displayEnd; + } + + @Override + public final void process( + Record input, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + long time = input.getLong(0); + if (reachedDisplayEnd || time >= displayEnd) { + reachedDisplayEnd = true; + curIndex++; + return; + } + if (displayBegin != UNSPECIFIED_DISPLAY_BEGIN && time < displayBegin) { + curIndex++; + return; + } + + processFilteredRecord(input, time, properColumnBuilders, passThroughIndexBuilder); + curIndex++; + } + + protected abstract void processFilteredRecord( + Record input, + long time, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder); + + protected final void updateWindow(WindowState windowState, Record input, long time) { + if (input.isNull(1)) { + return; + } + windowState.update(curIndex, time, valueOperator.read(input), valueOperator); + } + + protected final void outputWindow( + WindowState windowState, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + if (!windowState.hasOutput()) { + return; + } + + Candidate[] candidates = + new Candidate[] { + windowState.first, windowState.bottom, windowState.top, windowState.last + }; + Arrays.sort(candidates, Comparator.comparingLong(candidate -> candidate.index)); + + Set emittedTimestamps = new LinkedHashSet<>(); + for (Candidate candidate : candidates) { + if (!emittedTimestamps.add(candidate.time)) { + continue; + } + properColumnBuilders.get(0).writeLong(windowState.getOutputWindowStart()); + properColumnBuilders.get(1).writeLong(windowState.getOutputWindowEnd()); + properColumnBuilders.get(2).writeLong(candidate.time); + valueOperator.write(properColumnBuilders.get(3), candidate.value); + passThroughIndexBuilder.writeLong(candidate.index); + } + } + + protected final long alignWindowStart(long time) { + return Math.floorDiv(time, slide) * slide; + } + + protected final long getWindowEnd(long windowStart) { + return windowStart + size; + } + } + + private static class TimeWindowM4DataProcessor extends AbstractM4DataProcessor { + private final Deque activeWindows = new ArrayDeque<>(); + + private boolean nextWindowStartInitialized = false; + private long nextWindowStart; + + private TimeWindowM4DataProcessor( + ValueOperator valueOperator, long size, long slide, long displayBegin, long displayEnd) { + super(valueOperator, size, slide, displayBegin, displayEnd); + } + + @Override + protected void processFilteredRecord( + Record input, + long time, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + if (!nextWindowStartInitialized) { + nextWindowStart = + displayBegin == UNSPECIFIED_DISPLAY_BEGIN ? alignWindowStart(time) : displayBegin; + nextWindowStartInitialized = true; + } + + while (!activeWindows.isEmpty() && activeWindows.peekFirst().endExclusive <= time) { + outputWindow(activeWindows.removeFirst(), properColumnBuilders, passThroughIndexBuilder); + } + + if (activeWindows.isEmpty() && getWindowEnd(nextWindowStart) <= time) { + long skipBase = time - size - nextWindowStart; + long skipCount = Math.floorDiv(skipBase, slide) + 1; + nextWindowStart += skipCount * slide; + } + + while (nextWindowStart <= time && nextWindowStart < displayEnd) { + activeWindows.addLast(new TimeWindowState(nextWindowStart, getWindowEnd(nextWindowStart))); + nextWindowStart += slide; + } + + for (TimeWindowState activeWindow : activeWindows) { + updateWindow(activeWindow, input, time); + } + } + + @Override + public void finish( + List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + while (!activeWindows.isEmpty()) { + outputWindow(activeWindows.removeFirst(), properColumnBuilders, passThroughIndexBuilder); + } + } + } + + private static class CountWindowM4DataProcessor extends AbstractM4DataProcessor { + private final Deque activeWindows = new ArrayDeque<>(); + + private long filteredRowCount = 0; + private long nextWindowStart = 0; + + private CountWindowM4DataProcessor( + ValueOperator valueOperator, long size, long slide, long displayBegin, long displayEnd) { + super(valueOperator, size, slide, displayBegin, displayEnd); + } + + @Override + protected void processFilteredRecord( + Record input, + long time, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + while (!activeWindows.isEmpty() + && activeWindows.peekFirst().endExclusive <= filteredRowCount) { + outputWindow(activeWindows.removeFirst(), properColumnBuilders, passThroughIndexBuilder); + } + + if (activeWindows.isEmpty() && getWindowEnd(nextWindowStart) <= filteredRowCount) { + long skipBase = filteredRowCount - size - nextWindowStart; + long skipCount = Math.floorDiv(skipBase, slide) + 1; + nextWindowStart += skipCount * slide; + } + + while (nextWindowStart <= filteredRowCount) { + activeWindows.addLast(new CountWindowState(getWindowEnd(nextWindowStart))); + nextWindowStart += slide; + } + + for (CountWindowState activeWindow : activeWindows) { + updateCountWindow(activeWindow, input, time); + } + filteredRowCount++; + } + + private void updateCountWindow(CountWindowState windowState, Record input, long time) { + if (windowState.windowStart == Long.MIN_VALUE) { + windowState.windowStart = time; + } + windowState.windowEnd = time + 1; + if (input.isNull(1)) { + return; + } + updateWindow(windowState, input, time); + } + + @Override + public void finish( + List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + while (!activeWindows.isEmpty()) { + outputWindow(activeWindows.removeFirst(), properColumnBuilders, passThroughIndexBuilder); + } + } + } +}