diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 8c2f741692900..43b60fc371b23 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -185,7 +185,6 @@ org.bouncycastle bcprov-jdk18on - test junit diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java index ea41dc9b579ea..dded7cbce3087 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java @@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT { @Before public void setUp() { - MultiEnvFactory.createEnv(2); + MultiEnvFactory.createEnv(1); env = MultiEnvFactory.getEnv(0); env.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true); // 10 min, assert that the operations will not time out diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 645fbdbaa68bf..599a43476c1fd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -20,67 +20,280 @@ package org.apache.iotdb.pipe.it.single; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient; +import org.apache.iotdb.it.env.cluster.EnvUtils; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT1; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.File; +import java.net.ConnectException; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT1.class}) public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT { @Test - public void testOPCUASink() throws Exception { + public void testOPCUAServerSink() throws Exception { + int tcpPort = -1; + int httpsPort = -1; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { - TestUtils.executeNonQuery(env, "insert into root.db.d1(time,s1) values (1,1)", null); + TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null); - final Map connectorAttributes = new HashMap<>(); - connectorAttributes.put("sink", "opc-ua-sink"); - connectorAttributes.put("opcua.model", "client-server"); + final Map sinkAttributes = new HashMap<>(); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(Collections.emptyMap()) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); + sinkAttributes.put("sink", "opc-ua-sink"); + sinkAttributes.put("model", "client-server"); + sinkAttributes.put("opcua.security-policy", "None"); + + OpcUaClient opcUaClient; + DataValue value; + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + httpsPort = ports[1]; + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.singletonMap("user", "root")) + .setProcessorAttributes(Collections.emptyMap())) + .getCode()); + + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + opcUaClient.disconnect().get(); + break; + } + + // Create the region first to avoid tsFile parsing + TestUtils.executeNonQueries( + env, + Arrays.asList( + "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)", + "create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)", + "create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)", + "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)", + "insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)", + "insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"), + null); + + while (true) { + final int[] ports = EnvUtils.searchAvailablePorts(); + tcpPort = ports[0]; + httpsPort = ports[1]; + sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort)); + sinkAttributes.put("https.port", Integer.toString(httpsPort)); + sinkAttributes.put("with-quality", "true"); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .alterPipe( + new TAlterPipeReq() + .setPipeName("testPipe") + .setIsReplaceAllConnectorAttributes(true) + .setConnectorAttributes(sinkAttributes) + .setProcessorAttributes(Collections.emptyMap()) + .setExtractorAttributes(Collections.emptyMap())) + .getCode()); + try { + opcUaClient = + getOpcUaClient( + "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root"); + } catch (final PipeException e) { + if (e.getCause() instanceof ConnectException) { + continue; + } else { + throw e; + } + } + break; + } + + // Test multiple regions + TestUtils.executeNonQueries( + env, + Arrays.asList( + "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + "insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)", + "insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"), + null); + + long startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + + value = + opcUaClient + .readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1")) + .get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + + value = + opcUaClient + .readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2")) + .get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } + + TestUtils.executeNonQuery( + env, "insert into root.db.opc(time, quality) values (2, true)", null); + TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null); + + startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime()); + Assert.assertEquals(new Variant(2.0), value.getValue()); + Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } + + opcUaClient.disconnect().get(); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); // Test reconstruction - connectorAttributes.put("password", "test"); + sinkAttributes.put("password", "test"); + sinkAttributes.put("security-policy", "basic256sha256"); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) + new TCreatePipeReq("testPipe", sinkAttributes) .setExtractorAttributes(Collections.emptyMap()) .setProcessorAttributes(Collections.emptyMap())) .getCode()); + // Banned none, only allows basic256sha256 + final int finalTcpPort = tcpPort; + Assert.assertThrows( + PipeException.class, + () -> + getOpcUaClient( + "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb", + SecurityPolicy.None, + "root", + "root")); + // Test conflict - connectorAttributes.put("password", "conflict"); - Assert.assertEquals( - TSStatusCode.PIPE_ERROR.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(Collections.emptyMap()) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); + sinkAttributes.put("password", "conflict"); + try { + TestUtils.executeNonQuery( + env, + String.format( + "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')", + tcpPort, httpsPort), + null); + Assert.fail(); + } catch (final Exception e) { + Assert.assertEquals( + String.format( + "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port %s and https port %s's password **** conflicts to the new password ****, reject reusing.", + tcpPort, httpsPort), + e.getMessage()); + } + } finally { + if (tcpPort >= 0) { + final String lockPath = EnvUtils.getLockFilePath(tcpPort); + if (!new File(lockPath).delete()) { + System.out.printf("Delete lock file %s failed%n", lockPath); + } + } } } + + private static OpcUaClient getOpcUaClient( + final String nodeUrl, + final SecurityPolicy policy, + final String userName, + final String password) { + final IoTDBOpcUaClient client; + + final IdentityProvider provider = + Objects.nonNull(userName) + ? new UsernameProvider(userName, password) + : new AnonymousProvider(); + + final String securityDir = + CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE + + File.separatorChar + + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET)); + + client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false); + new ClientRunner(client, securityDir, password, userName, 10).run(); + return client.getClient(); + } } diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 3dcb2d19b0a78..b6d59c827f000 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -59,7 +59,9 @@ public Map getAttribute() { } public boolean hasAttribute(final String key) { - return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key)); + return attributes.containsKey(key) + || attributes.containsKey(KeyReducer.shallowReduce(key)) + || attributes.containsKey(KeyReducer.reduce(key)); } public boolean hasAnyAttributes(final String... keys) { @@ -76,7 +78,11 @@ public void addAttribute(final String key, String values) { } public String getString(final String key) { - final String value = attributes.get(key); + String value = attributes.get(key); + if (Objects.nonNull(value)) { + return value; + } + value = attributes.get(KeyReducer.shallowReduce(key)); return value != null ? value : attributes.get(KeyReducer.reduce(key)); } @@ -350,29 +356,52 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame } private static class KeyReducer { - - private static final Set PREFIXES = new HashSet<>(); + private static final Set FIRST_PREFIXES = new HashSet<>(); + private static final Set SECOND_PREFIXES = new HashSet<>(); static { - PREFIXES.add("extractor."); - PREFIXES.add("source."); - PREFIXES.add("processor."); - PREFIXES.add("connector."); - PREFIXES.add("sink."); + FIRST_PREFIXES.add("extractor."); + FIRST_PREFIXES.add("source."); + FIRST_PREFIXES.add("processor."); + FIRST_PREFIXES.add("connector."); + FIRST_PREFIXES.add("sink."); + + SECOND_PREFIXES.add("opcua."); } - static String reduce(final String key) { + static String shallowReduce(String key) { if (key == null) { return null; } final String lowerCaseKey = key.toLowerCase(); - for (final String prefix : PREFIXES) { + for (final String prefix : FIRST_PREFIXES) { if (lowerCaseKey.startsWith(prefix)) { return key.substring(prefix.length()); } } return key; } + + public static String reduce(String key) { + if (key == null) { + return null; + } + String lowerCaseKey = key.toLowerCase(); + for (final String prefix : FIRST_PREFIXES) { + if (lowerCaseKey.startsWith(prefix)) { + key = key.substring(prefix.length()); + lowerCaseKey = lowerCaseKey.substring(prefix.length()); + break; + } + } + for (final String prefix : SECOND_PREFIXES) { + if (lowerCaseKey.startsWith(prefix)) { + key = key.substring(prefix.length()); + break; + } + } + return key; + } } public static class ValueHider { diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 2a019e1fc436b..175a77e746e92 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -206,6 +206,18 @@ org.checkerframework checker-qual + + org.eclipse.milo + stack-client + + + org.eclipse.milo + sdk-client + + + org.bouncycastle + bcprov-jdk18on + org.eclipse.jetty jetty-http diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index 5ce35e750538e..e198506122474 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -22,6 +22,10 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; @@ -30,44 +34,86 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; +import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; import org.eclipse.milo.opcua.sdk.server.OpcUaServer; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.File; import java.util.Arrays; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY; /** * Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into @@ -83,9 +129,21 @@ public class OpcUaSink implements PipeConnector { private static final Map> SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>(); + private static final Map> + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>(); private String serverKey; - private OpcUaNameSpace nameSpace; + private String nodeUrl; + private boolean isClientServerModel; + private @Nullable String valueName; + private @Nullable String qualityName; + private StatusCode defaultQuality; + + // Inner server + private @Nullable OpcUaNameSpace nameSpace; + + // Outer server + private @Nullable IoTDBOpcUaClient client; @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -104,13 +162,71 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY), false); - ; + + final PipeParameters parameters = validator.getParameters(); + if (validator + .getParameters() + .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY) + || parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY), + CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE)) { + validator.validate( + CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals, + String.format( + "When the OPC UA sink points to an outer server or sets 'with-quality' to true, the %s or %s must be %s.", + CONNECTOR_OPC_UA_MODEL_KEY, + SINK_OPC_UA_MODEL_KEY, + CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE), + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), + CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)); + } } @Override public void customize( final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) throws Exception { + final boolean withQuality = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY), + CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE); + valueName = + withQuality + ? parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY), + CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE) + : null; + qualityName = + withQuality + ? parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY), + CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE) + : null; + defaultQuality = + getQuality( + withQuality + ? parameters.getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, SINK_OPC_UA_DEFAULT_QUALITY_KEY), + CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE) + : CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE); + isClientServerModel = + parameters + .getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), + CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) + .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE); + + nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY); + if (Objects.isNull(nodeUrl)) { + customizeServer(parameters); + } else { + customizeClient(parameters); + } + } + + private void customizeServer(final PipeParameters parameters) { final int tcpBindPort = parameters.getIntOrDefault( Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, SINK_OPC_UA_TCP_BIND_PORT_KEY), @@ -147,6 +263,21 @@ public void customize( CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY, SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY), CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE); + final Set securityPolicies = + (parameters.hasAnyAttributes( + CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY) + ? Arrays.stream( + parameters + .getStringByKeys( + CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY) + .replace(" ", "") + .split(",")) + : CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES.stream()) + .map(this::getSecurityPolicy) + .collect(Collectors.toSet()); + if (securityPolicies.isEmpty()) { + throw new PipeException("The security policy cannot be empty."); + } synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) { serverKey = httpsBindPort + ":" + tcpBindPort; @@ -165,25 +296,22 @@ public void customize( .setUser(user) .setPassword(password) .setSecurityDir(securityDir) - .setEnableAnonymousAccess(enableAnonymousAccess); + .setEnableAnonymousAccess(enableAnonymousAccess) + .setSecurityPolicies(securityPolicies); final OpcUaServer newServer = builder.build(); - nameSpace = - new OpcUaNameSpace( - newServer, - parameters - .getStringOrDefault( - Arrays.asList( - CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), - CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) - .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE), - builder); + nameSpace = new OpcUaNameSpace(newServer, builder); nameSpace.startup(); newServer.startup().get(); return new Pair<>(new AtomicInteger(0), nameSpace); } else { oldValue .getRight() - .checkEquals(user, password, securityDir, enableAnonymousAccess); + .checkEquals( + user, + password, + securityDir, + enableAnonymousAccess, + securityPolicies); return oldValue; } } catch (final PipeException e) { @@ -197,6 +325,104 @@ public void customize( } } + private void customizeClient(final PipeParameters parameters) { + final SecurityPolicy policy = + getSecurityPolicy( + parameters + .getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY), + CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE) + .toUpperCase()); + + final IdentityProvider provider; + final String userName = + parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY); + final String password = + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY), + CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + provider = + Objects.nonNull(userName) + ? new UsernameProvider(userName, password) + : new AnonymousProvider(); + + final String securityDir = + IoTDBConfig.addDataHomeDir( + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, SINK_OPC_UA_SECURITY_DIR_KEY), + CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE + + File.separatorChar + + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET)))); + + final long timeoutSeconds = + parameters.getLongOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, SINK_OPC_UA_TIMEOUT_SECONDS_KEY), + CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE); + + synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) { + client = + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP + .compute( + nodeUrl, + (key, oldValue) -> { + if (Objects.isNull(oldValue)) { + final IoTDBOpcUaClient result = + new IoTDBOpcUaClient( + nodeUrl, + policy, + provider, + parameters.getBooleanOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_HISTORIZING_KEY, + SINK_OPC_UA_HISTORIZING_KEY), + CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE)); + final ClientRunner runner = + new ClientRunner(result, securityDir, password, userName, timeoutSeconds); + runner.run(); + return new Pair<>(new AtomicInteger(0), result); + } + oldValue.getRight().checkEquals(userName, password, securityDir, policy); + return oldValue; + }) + .getRight(); + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet(); + } + } + + private SecurityPolicy getSecurityPolicy(final String securityPolicy) { + switch (securityPolicy.toUpperCase()) { + case CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE: + return SecurityPolicy.None; + case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE: + return SecurityPolicy.Basic128Rsa15; + case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE: + return SecurityPolicy.Basic256; + case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE: + return SecurityPolicy.Basic256Sha256; + case CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE: + return SecurityPolicy.Aes128_Sha256_RsaOaep; + case CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE: + return SecurityPolicy.Aes256_Sha256_RsaPss; + default: + throw new PipeException( + "The security policy can only be 'None', 'Basic128Rsa15', 'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 'Aes256_Sha256_RsaPss'."); + } + } + + private StatusCode getQuality(final String quality) { + switch (quality.toUpperCase()) { + case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE: + return StatusCode.GOOD; + case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE: + return StatusCode.BAD; + case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE: + return StatusCode.UNCERTAIN; + default: + throw new PipeException("The default quality can only be 'GOOD', 'BAD' or 'UNCERTAIN'."); + } + } + @Override public void handshake() throws Exception { // Server side, do nothing @@ -214,7 +440,19 @@ public void transfer(final Event event) throws Exception { @Override public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { - transferByTablet(tabletInsertionEvent, LOGGER, tablet -> nameSpace.transfer(tablet)); + transferByTablet( + tabletInsertionEvent, + LOGGER, + tablet -> { + if (Objects.nonNull(nameSpace)) { + nameSpace.transfer(tablet, this); + } else if (Objects.nonNull(client)) { + client.transfer(tablet, this); + } else { + throw new PipeException( + "No OPC client or server is specified when transferring tablet"); + } + }); } public static void transferByTablet( @@ -299,5 +537,45 @@ public void close() throws Exception { } } } + + if (nodeUrl == null) { + return; + } + + synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) { + final Pair pair = + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl); + if (pair == null) { + return; + } + + if (pair.getLeft().decrementAndGet() <= 0) { + try { + pair.getRight().disconnect(); + } finally { + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl); + } + } + } + } + + /////////////////////////////// Getter /////////////////////////////// + + public boolean isClientServerModel() { + return isClientServerModel; + } + + @Nullable + public String getValueName() { + return valueName; + } + + @Nullable + public String getQualityName() { + return qualityName; + } + + public StatusCode getDefaultQuality() { + return defaultQuality; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java new file mode 100644 index 0000000000000..9e3b46d463a17 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcua.client; + +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator; +import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.Security; +import java.util.Objects; + +import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; + +public class ClientRunner { + + private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class); + + static { + // Required for SecurityPolicy.Aes256_Sha256_RsaPss + Security.addProvider(new BouncyCastleProvider()); + } + + private final IoTDBOpcUaClient configurableUaClient; + private final Path securityDir; + private final String password; + private final long timeoutSeconds; + + // For conflict checking + private final String user; + + public ClientRunner( + final IoTDBOpcUaClient configurableUaClient, + final String securityDir, + final String password, + final String user, + final long timeoutSeconds) { + this.configurableUaClient = configurableUaClient; + this.securityDir = Paths.get(securityDir); + this.password = password; + this.user = user; + this.timeoutSeconds = timeoutSeconds; + configurableUaClient.setRunner(this); + } + + private OpcUaClient createClient() throws Exception { + Files.createDirectories(securityDir); + if (!Files.exists(securityDir)) { + throw new Exception("unable to create security dir: " + securityDir); + } + + final File pkiDir = securityDir.resolve("pki").toFile(); + + logger.info("security dir: {}", securityDir.toAbsolutePath()); + logger.info("security pki dir: {}", pkiDir.getAbsolutePath()); + + final IoTDBKeyStoreLoaderClient loader = + new IoTDBKeyStoreLoaderClient().load(securityDir, password.toCharArray()); + + final DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir); + + final DefaultClientCertificateValidator certificateValidator = + new DefaultClientCertificateValidator(trustListManager); + + return OpcUaClient.create( + configurableUaClient.getNodeUrl(), + endpoints -> endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(), + configBuilder -> + configBuilder + .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA client")) + .setApplicationUri("urn:apache:iotdb:opc-ua-client") + .setKeyPair(loader.getClientKeyPair()) + .setCertificate(loader.getClientCertificate()) + .setCertificateChain(loader.getClientCertificateChain()) + .setCertificateValidator(certificateValidator) + .setIdentityProvider(configurableUaClient.getIdentityProvider()) + .setRequestTimeout(uint(timeoutSeconds * 1000L)) + .setConnectTimeout(uint(timeoutSeconds * 1000L)) + .setMaxResponseMessageSize(uint(0)) + .build()); + } + + public void run() { + try { + final OpcUaClient client = createClient(); + + try { + configurableUaClient.run(client); + } catch (final Exception e) { + throw new PipeException( + "Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } catch (final Exception e) { + throw new PipeException( + "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } + + long getTimeoutSeconds() { + return timeoutSeconds; + } + + /////////////////////////////// Conflict detection /////////////////////////////// + + void checkEquals( + final String user, + final String password, + final Path securityDir, + final SecurityPolicy securityPolicy) { + checkEquals("user", this.user, user); + checkEquals("password", this.password, password); + checkEquals( + "security dir", + FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()), + FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); + checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), securityPolicy); + } + + private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) { + if (!Objects.equals(thisAttr, thatAttr)) { + if (attrName.equals("password")) { + thisAttr = "****"; + thatAttr = "****"; + } + throw new PipeException( + String.format( + "The existing server with nodeUrl %s's %s %s conflicts to the new %s %s, reject reusing.", + configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, thatAttr)); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java new file mode 100644 index 0000000000000..bfaf378822c31 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcua.client; + +import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil; +import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder; +import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.Key; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.regex.Pattern; + +class IoTDBKeyStoreLoaderClient { + + private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class); + + private static final Pattern IP_ADDR_PATTERN = + Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$"); + + private static final String CLIENT_ALIAS = "client-ai"; + + private X509Certificate[] clientCertificateChain; + private X509Certificate clientCertificate; + private KeyPair clientKeyPair; + + IoTDBKeyStoreLoaderClient load(final Path baseDir, final char[] password) throws Exception { + final KeyStore keyStore = KeyStore.getInstance("PKCS12"); + + final Path serverKeyStore = baseDir.resolve("iotdb-client.pfx"); + + logger.info("Loading KeyStore at {}.", serverKeyStore); + + if (!Files.exists(serverKeyStore)) { + keyStore.load(null, password); + + final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048); + + final SelfSignedCertificateBuilder builder = + new SelfSignedCertificateBuilder(keyPair) + .setCommonName("Apache IoTDB OPC UA client") + .setOrganization("Apache") + .setOrganizationalUnit("dev") + .setLocalityName("Beijing") + .setStateName("China") + .setCountryCode("CN") + .setApplicationUri("urn:apache:iotdb:opc-ua-client") + .addDnsName("localhost") + .addIpAddress("127.0.0.1"); + + // Get as many hostnames and IP addresses as we can listed in the certificate. + for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) { + if (IP_ADDR_PATTERN.matcher(hostname).matches()) { + builder.addIpAddress(hostname); + } else { + builder.addDnsName(hostname); + } + } + + final X509Certificate certificate = builder.build(); + + keyStore.setKeyEntry( + CLIENT_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate}); + try (OutputStream out = Files.newOutputStream(serverKeyStore)) { + keyStore.store(out, password); + } + } else { + try (InputStream in = Files.newInputStream(serverKeyStore)) { + keyStore.load(in, password); + } + } + + final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, password); + if (clientPrivateKey instanceof PrivateKey) { + clientCertificate = (X509Certificate) keyStore.getCertificate(CLIENT_ALIAS); + + clientCertificateChain = + Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS)) + .map(X509Certificate.class::cast) + .toArray(X509Certificate[]::new); + + final PublicKey serverPublicKey = clientCertificate.getPublicKey(); + clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey); + } + + return this; + } + + X509Certificate getClientCertificate() { + return clientCertificate; + } + + public X509Certificate[] getClientCertificateChain() { + return clientCertificateChain; + } + + KeyPair getClientKeyPair() { + return clientKeyPair; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java new file mode 100644 index 0000000000000..2019c0fe83336 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcua.client; + +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.core.AccessLevel; +import org.eclipse.milo.opcua.sdk.core.ValueRanks; +import org.eclipse.milo.opcua.stack.core.Identifiers; +import org.eclipse.milo.opcua.stack.core.StatusCodes; +import org.eclipse.milo.opcua.stack.core.UaException; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; +import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject; +import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned; +import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass; +import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem; +import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse; +import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult; +import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription; +import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes; +import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.function.Predicate; + +import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType; +import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc; +import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout; + +public class IoTDBOpcUaClient { + private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); + + // Customized nodes + private static final int NAME_SPACE_INDEX = 2; + + // Useless for a server only accept client writing + private static final double SAMPLING_INTERVAL_PLACEHOLDER = 500; + private final String nodeUrl; + + private final SecurityPolicy securityPolicy; + private final IdentityProvider identityProvider; + private OpcUaClient client; + private final boolean historizing; + private ClientRunner runner; + + public IoTDBOpcUaClient( + final String nodeUrl, + final SecurityPolicy securityPolicy, + final IdentityProvider identityProvider, + final boolean historizing) { + this.nodeUrl = nodeUrl; + this.securityPolicy = securityPolicy; + this.identityProvider = identityProvider; + this.historizing = historizing; + } + + public void run(final OpcUaClient client) throws Exception { + // synchronous connect + this.client = client; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() * 1000L) { + try { + client.connect().get(); + } catch (final ExecutionException e) { + if (e.getCause() instanceof UaException + && ((UaException) e.getCause()).getStatusCode().getValue() == Bad_Timeout) { + Thread.sleep(1000L); + continue; + } + throw e; + } + break; + } + } + + // Only support tree model & client-server + public void transfer(final Tablet tablet, final OpcUaSink sink) throws Exception { + OpcUaNameSpace.transferTabletForClientServerModel( + tablet, sink, this::transferTabletRowForClientServerModel); + } + + private void transferTabletRowForClientServerModel( + final String[] segments, + final List measurementSchemas, + final List timestamps, + final List values, + final OpcUaSink sink) + throws Exception { + StatusCode currentQuality = sink.getDefaultQuality(); + Object value = null; + long timestamp = 0; + NodeId nodeId = null; + NodeId opcDataType = null; + + for (int i = 0; i < measurementSchemas.size(); ++i) { + if (Objects.isNull(values.get(i))) { + continue; + } + final String name = measurementSchemas.get(i).getMeasurementId(); + final TSDataType type = measurementSchemas.get(i).getType(); + if (Objects.nonNull(sink.getQualityName()) && sink.getQualityName().equals(name)) { + if (!type.equals(TSDataType.BOOLEAN)) { + throw new UnsupportedOperationException( + "The quality value only supports boolean type, while true == GOOD and false == BAD."); + } + currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD; + continue; + } + if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) { + PipeLogger.log( + LOGGER::warn, + "When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\""); + continue; + } + nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments)); + + final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0)); + value = values.get(i); + timestamp = utcTimestamp; + opcDataType = convertToOpcDataType(type); + } + if (Objects.isNull(value)) { + return; + } + + final Variant variant = new Variant(value); + final DataValue dataValue = + new DataValue(variant, currentQuality, new DateTime(timestamp), new DateTime()); + StatusCode writeStatus = client.writeValue(nodeId, dataValue).get(); + + if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) { + final AddNodesResponse addStatus = + client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get(); + for (final AddNodesResult result : addStatus.getResults()) { + if (!result.getStatusCode().equals(StatusCode.GOOD) + && !(result.getStatusCode().getValue() == StatusCodes.Bad_NodeIdExists)) { + throw new PipeException( + "Failed to create nodes after transfer data value, creation status: " + + addStatus + + getErrorString(segments, opcDataType, value, writeStatus)); + } + } + writeStatus = client.writeValue(nodeId, dataValue).get(); + if (writeStatus.getValue() != StatusCode.GOOD.getValue()) { + throw new PipeException( + "Failed to transfer dataValue after successfully created nodes" + + getErrorString(segments, opcDataType, value, writeStatus)); + } + } else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) { + throw new PipeException( + "Failed to transfer dataValue" + + getErrorString(segments, opcDataType, value, writeStatus)); + } + } + + private static String getErrorString( + final String[] segments, + final NodeId dataType, + final Object value, + final StatusCode writeStatus) { + return ", segments: " + + Arrays.toString(segments) + + ", dataType: " + + dataType + + ", value: " + + value + + ", error: " + + writeStatus; + } + + public List getNodesToAdd( + final String[] segments, final NodeId opcDataType, final Variant initialValue) { + final List addNodesItems = new ArrayList<>(); + final StringBuilder sb = new StringBuilder(segments[0]); + ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, segments[0]).expanded(); + addNodesItems.add( + new AddNodesItem( + Identifiers.ObjectsFolder.expanded(), + Identifiers.Organizes, + curNodeId, + new QualifiedName(NAME_SPACE_INDEX, segments[0]), + NodeClass.Object, + ExtensionObject.encode( + client.getStaticSerializationContext(), createFolderAttributes(segments[0])), + Identifiers.FolderType.expanded())); + + // segments.length >= 3 + for (int i = 1; i < segments.length - 1; ++i) { + sb.append("/").append(segments[i]); + final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(); + addNodesItems.add( + new AddNodesItem( + curNodeId, + Identifiers.Organizes, + nextId, + new QualifiedName(NAME_SPACE_INDEX, segments[i]), + NodeClass.Object, + ExtensionObject.encode( + client.getStaticSerializationContext(), createFolderAttributes(segments[i])), + Identifiers.FolderType.expanded())); + curNodeId = nextId; + } + + final String measurementName = segments[segments.length - 1]; + sb.append("/").append(measurementName); + addNodesItems.add( + new AddNodesItem( + curNodeId, + Identifiers.Organizes, + new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(), + new QualifiedName(NAME_SPACE_INDEX, measurementName), + NodeClass.Variable, + ExtensionObject.encode( + client.getStaticSerializationContext(), + createMeasurementAttributes(measurementName, opcDataType, initialValue)), + Identifiers.BaseDataVariableType.expanded())); + + return addNodesItems; + } + + public void disconnect() throws Exception { + if (Objects.nonNull(client)) { + client.disconnect().get(); + } + } + + /////////////////////////////// Getter /////////////////////////////// + + String getNodeUrl() { + return nodeUrl; + } + + Predicate endpointFilter() { + return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri()); + } + + SecurityPolicy getSecurityPolicy() { + return securityPolicy; + } + + IdentityProvider getIdentityProvider() { + return identityProvider; + } + + @TestOnly + public OpcUaClient getClient() { + return client; + } + + /////////////////////////////// Attribute creator /////////////////////////////// + + private VariableAttributes createMeasurementAttributes( + final String name, final NodeId objectType, final Variant initialValue) { + return new VariableAttributes( + Unsigned.uint(0xFFFF), // specifiedAttributes + LocalizedText.english(name), + LocalizedText.english(name), + Unsigned.uint(0), // writeMask + Unsigned.uint(0), // userWriteMask + initialValue, + objectType, + ValueRanks.Scalar, + null, // arrayDimensions + AccessLevel.toValue(AccessLevel.READ_WRITE), + AccessLevel.toValue(AccessLevel.READ_WRITE), + SAMPLING_INTERVAL_PLACEHOLDER, + historizing); + } + + private static ObjectAttributes createFolderAttributes(final String name) { + return new ObjectAttributes( + Unsigned.uint(0xFFFF), // specifiedAttributes + LocalizedText.english(name), + LocalizedText.english(name), + Unsigned.uint(0), // writeMask + Unsigned.uint(0), // userWriteMask + null // notifier + ); + } + + /////////////////////////////// Conflict detection /////////////////////////////// + + public void setRunner(ClientRunner runner) { + this.runner = runner; + } + + public void checkEquals( + final String user, + final String password, + final String securityDir, + final SecurityPolicy securityPolicy) { + runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java similarity index 98% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java index b17f27532d7ae..56b231fb4608a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.sink.protocol.opcua; +package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; import org.apache.iotdb.commons.utils.FileUtils; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java similarity index 70% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 465a2f6745585..04cc251d6558b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -17,10 +17,12 @@ * under the License. */ -package org.apache.iotdb.db.pipe.sink.protocol.opcua; +package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; @@ -46,33 +48,35 @@ import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel; import org.eclipse.milo.opcua.stack.core.Identifiers; import org.eclipse.milo.opcua.stack.core.UaException; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.file.Paths; import java.sql.Date; import java.time.LocalDate; import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.UUID; public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { + private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server"; - private final boolean isClientServerModel; private final SubscriptionModel subscriptionModel; private final OpcUaServerBuilder builder; - OpcUaNameSpace( - final OpcUaServer server, - final boolean isClientServerModel, - final OpcUaServerBuilder builder) { + public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) { super(server, NAMESPACE_URI); - this.isClientServerModel = isClientServerModel; this.builder = builder; subscriptionModel = new SubscriptionModel(server, this); @@ -93,25 +97,65 @@ public void shutdown() { }); } - void transfer(final Tablet tablet) throws UaException { - if (isClientServerModel) { - transferTabletForClientServerModel(tablet); + public void transfer(final Tablet tablet, final OpcUaSink sink) throws Exception { + if (sink.isClientServerModel()) { + transferTabletForClientServerModel(tablet, sink, this::transferTabletRowForClientServerModel); } else { transferTabletForPubSubModel(tablet); } } - private void transferTabletForClientServerModel(final Tablet tablet) { + public static void transferTabletForClientServerModel( + final Tablet tablet, final OpcUaSink sink, final TabletRowConsumer consumer) + throws Exception { + final List schemas = tablet.getSchemas(); + final List newSchemas = new ArrayList<>(); new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); - final String[] segments = tablet.deviceId.split("\\."); + final List timestamps = new ArrayList<>(); + final List values = new ArrayList<>(); + + for (int i = 0; i < schemas.size(); ++i) { + for (int j = tablet.rowSize - 1; j >= 0; --j) { + if (tablet.bitMaps == null || tablet.bitMaps[i] == null || !tablet.bitMaps[i].isMarked(j)) { + newSchemas.add(schemas.get(i)); + timestamps.add(tablet.timestamps[j]); + values.add(getTabletObjectValue4Opc(tablet.values[i], j, schemas.get(i).getType())); + break; + } + } + } + + consumer.accept(tablet.deviceId.split("\\."), newSchemas, timestamps, values, sink); + } + + @FunctionalInterface + public interface TabletRowConsumer { + void accept( + final String[] segments, + final List measurementSchemas, + final List timestamps, + final List values, + final OpcUaSink sink) + throws Exception; + } + + private void transferTabletRowForClientServerModel( + final String[] segments, + final List measurementSchemas, + final List timestamps, + final List values, + final OpcUaSink sink) { if (segments.length == 0) { throw new PipeRuntimeCriticalException("The segments of tablets must exist"); } final StringBuilder currentStr = new StringBuilder(); UaNode folderNode = null; NodeId folderNodeId; - for (final String segment : segments) { + for (int i = 0; + i < (Objects.isNull(sink.getValueName()) ? segments.length : segments.length - 1); + ++i) { + final String segment = segments[i]; final UaNode nextFolderNode; currentStr.append(segment); @@ -150,48 +194,61 @@ private void transferTabletForClientServerModel(final Tablet tablet) { () -> new PipeRuntimeCriticalException( String.format( - "The folder node for %s does not exist.", tablet.deviceId))); + "The folder node for %s does not exist.", + Arrays.toString(segments)))); } } final String currentFolder = currentStr.toString(); - for (int i = 0; i < tablet.getSchemas().size(); ++i) { - final MeasurementSchema measurementSchema = tablet.getSchemas().get(i); - final String name = measurementSchema.getMeasurementId(); - final TSDataType type = measurementSchema.getType(); - final NodeId nodeId = newNodeId(currentFolder + name); - final UaVariableNode measurementNode; - int lastNonnullIndex = -1; - for (int j = tablet.rowSize - 1; j >= 0; --j) { - if (!tablet.bitMaps[i].isMarked(j)) { - lastNonnullIndex = j; - break; + StatusCode currentQuality = sink.getDefaultQuality(); + UaVariableNode valueNode = null; + Object value = null; + long timestamp = 0; + + for (int i = 0; i < measurementSchemas.size(); ++i) { + if (Objects.isNull(values.get(i))) { + continue; + } + final String name = measurementSchemas.get(i).getMeasurementId(); + final TSDataType type = measurementSchemas.get(i).getType(); + if (Objects.nonNull(sink.getQualityName()) && sink.getQualityName().equals(name)) { + if (!type.equals(TSDataType.BOOLEAN)) { + throw new UnsupportedOperationException( + "The quality value only supports boolean type, while true == GOOD and false == BAD."); } + currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD; + continue; } - - if (lastNonnullIndex == -1) { + if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) { + PipeLogger.log( + LOGGER::warn, + "When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\""); continue; } - - final long utcTimestamp = timestampToUtc(tablet.timestamps[lastNonnullIndex]); - final DataValue value = + final String nodeName = + Objects.isNull(sink.getValueName()) ? name : segments[segments.length - 1]; + final NodeId nodeId = newNodeId(currentFolder + nodeName); + final UaVariableNode measurementNode; + final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0)); + final DataValue dataValue = new DataValue( - new Variant(getTabletObjectValue4Opc(tablet.values[i], lastNonnullIndex, type)), - StatusCode.GOOD, + new Variant(values.get(i)), + currentQuality, new DateTime(utcTimestamp), new DateTime()); + if (!getNodeManager().containsNode(nodeId)) { measurementNode = new UaVariableNode.UaVariableNodeBuilder(getNodeContext()) - .setNodeId(newNodeId(currentFolder + name)) + .setNodeId(nodeId) .setAccessLevel(AccessLevel.READ_WRITE) .setUserAccessLevel(AccessLevel.READ_ONLY) - .setBrowseName(newQualifiedName(name)) - .setDisplayName(LocalizedText.english(name)) + .setBrowseName(newQualifiedName(nodeName)) + .setDisplayName(LocalizedText.english(nodeName)) .setDataType(convertToOpcDataType(type)) .setTypeDefinition(Identifiers.BaseDataVariableType) - .setValue(value) + .setValue(dataValue) .build(); getNodeManager().addNode(measurementNode); if (Objects.nonNull(folderNode)) { @@ -215,15 +272,25 @@ private void transferTabletForClientServerModel(final Tablet tablet) { String.format("The Node %s does not exist.", nodeId))); } - if (Objects.isNull(measurementNode.getValue()) - || Objects.isNull(measurementNode.getValue().getSourceTime()) - || measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) { - measurementNode.setValue( + if (Objects.isNull(sink.getValueName())) { + if (Objects.isNull(measurementNode.getValue()) + || Objects.isNull(measurementNode.getValue().getSourceTime()) + || measurementNode.getValue().getSourceTime().getUtcTime() < utcTimestamp) { + measurementNode.setValue(dataValue); + } + } else { + valueNode = measurementNode; + value = values.get(i); + timestamp = utcTimestamp; + } + } + if (Objects.nonNull(valueNode)) { + if (Objects.isNull(valueNode.getValue()) + || Objects.isNull(valueNode.getValue().getSourceTime()) + || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) { + valueNode.setValue( new DataValue( - new Variant(getTabletObjectValue4Opc(tablet.values[i], lastNonnullIndex, type)), - StatusCode.GOOD, - new DateTime(utcTimestamp), - new DateTime())); + new Variant(value), currentQuality, new DateTime(timestamp), new DateTime())); } } } @@ -254,7 +321,7 @@ private static Object getTabletObjectValue4Opc( } } - private static long timestampToUtc(final long timeStamp) { + public static long timestampToUtc(final long timeStamp) { return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 116444736000000000L; } @@ -271,9 +338,9 @@ private void transferTabletForPubSubModel(final Tablet tablet) throws UaExceptio .getEventFactory() .createEvent( new NodeId(getNamespaceIndex(), UUID.randomUUID()), Identifiers.BaseEventType); + // Use eventNode here because other nodes doesn't support values and times simultaneously for (int columnIndex = 0; columnIndex < tablet.getSchemas().size(); ++columnIndex) { - final TSDataType dataType = tablet.getSchemas().get(columnIndex).getType(); // Source name --> Sensor path, like root.test.d_0.s_0 @@ -309,8 +376,8 @@ private void transferTabletForPubSubModel(final Tablet tablet) throws UaExceptio case DATE: eventNode.setMessage( LocalizedText.english( - (((LocalDate[]) tablet.values[columnIndex])[rowIndex]) - .atStartOfDay(ZoneId.systemDefault()) + ((LocalDate[]) tablet.values[columnIndex]) + [rowIndex].atStartOfDay(ZoneId.systemDefault()) .toString())); break; case INT64: @@ -355,7 +422,7 @@ private void transferTabletForPubSubModel(final Tablet tablet) throws UaExceptio eventNode.delete(); } - private NodeId convertToOpcDataType(final TSDataType type) { + public static NodeId convertToOpcDataType(final TSDataType type) { switch (type) { case BOOLEAN: return Identifiers.Boolean; @@ -403,11 +470,13 @@ public void onMonitoringModeChanged(final List monitoredItems) { /////////////////////////////// Conflict detection /////////////////////////////// - void checkEquals( + public void checkEquals( final String user, final String password, final String securityDir, - final boolean enableAnonymousAccess) { - builder.checkEquals(user, password, Paths.get(securityDir), enableAnonymousAccess); + final boolean enableAnonymousAccess, + final Set securityPolicies) { + builder.checkEquals( + user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java similarity index 82% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java index bc2df4839e2bc..61818ecf852e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java @@ -17,9 +17,8 @@ * under the License. */ -package org.apache.iotdb.db.pipe.sink.protocol.opcua; +package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; -import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.pipe.api.exception.PipeException; import org.eclipse.milo.opcua.sdk.server.OpcUaServer; @@ -58,6 +57,7 @@ import java.nio.file.Paths; import java.security.KeyPair; import java.security.cert.X509Certificate; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; @@ -84,48 +84,45 @@ public class OpcUaServerBuilder implements Closeable { private String password; private Path securityDir; private boolean enableAnonymousAccess; + private Set securityPolicies; private DefaultTrustListManager trustListManager; - OpcUaServerBuilder() { - tcpBindPort = PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; - httpsBindPort = PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; - user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; - password = PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; - securityDir = Paths.get(PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE); - enableAnonymousAccess = PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; - } - - OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { + public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { this.tcpBindPort = tcpBindPort; return this; } - OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { + public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { this.httpsBindPort = httpsBindPort; return this; } - OpcUaServerBuilder setUser(final String user) { + public OpcUaServerBuilder setUser(final String user) { this.user = user; return this; } - OpcUaServerBuilder setPassword(final String password) { + public OpcUaServerBuilder setPassword(final String password) { this.password = password; return this; } - OpcUaServerBuilder setSecurityDir(final String securityDir) { + public OpcUaServerBuilder setSecurityDir(final String securityDir) { this.securityDir = Paths.get(securityDir); return this; } - OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { + public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { this.enableAnonymousAccess = enableAnonymousAccess; return this; } - OpcUaServer build() throws Exception { + public OpcUaServerBuilder setSecurityPolicies(final Set securityPolicies) { + this.securityPolicies = securityPolicies; + return this; + } + + public OpcUaServer build() throws Exception { Files.createDirectories(securityDir); if (!Files.exists(securityDir)) { throw new PipeException("Unable to create security dir: " + securityDir); @@ -247,30 +244,36 @@ private Set createEndpointConfigurations( USER_TOKEN_POLICY_USERNAME, USER_TOKEN_POLICY_X509); - final EndpointConfiguration.Builder noSecurityBuilder = - builder - .copy() - .setSecurityPolicy(SecurityPolicy.None) - .setSecurityMode(MessageSecurityMode.None); - - endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort)); - endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort)); - - endpointConfigurations.add( - buildTcpEndpoint( - builder - .copy() - .setSecurityPolicy(SecurityPolicy.Basic256Sha256) - .setSecurityMode(MessageSecurityMode.SignAndEncrypt), - tcpBindPort)); - - endpointConfigurations.add( - buildHttpsEndpoint( - builder - .copy() - .setSecurityPolicy(SecurityPolicy.Basic256Sha256) - .setSecurityMode(MessageSecurityMode.Sign), - httpsBindPort)); + final Set securityPolicySet = new HashSet<>(securityPolicies); + if (securityPolicySet.contains(SecurityPolicy.None)) { + final EndpointConfiguration.Builder noSecurityBuilder = + builder + .copy() + .setSecurityPolicy(SecurityPolicy.None) + .setSecurityMode(MessageSecurityMode.None); + + endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort)); + endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort)); + securityPolicySet.remove(SecurityPolicy.None); + } + + for (final SecurityPolicy securityPolicy : securityPolicySet) { + endpointConfigurations.add( + buildTcpEndpoint( + builder + .copy() + .setSecurityPolicy(securityPolicy) + .setSecurityMode(MessageSecurityMode.SignAndEncrypt), + tcpBindPort)); + + endpointConfigurations.add( + buildHttpsEndpoint( + builder + .copy() + .setSecurityPolicy(securityPolicy) + .setSecurityMode(MessageSecurityMode.Sign), + httpsBindPort)); + } final EndpointConfiguration.Builder discoveryBuilder = builder @@ -309,7 +312,8 @@ void checkEquals( final String user, final String password, final Path securityDir, - final boolean enableAnonymousAccess) { + final boolean enableAnonymousAccess, + final Set securityPolicies) { checkEquals("user", this.user, user); checkEquals("password", this.password, password); checkEquals( @@ -317,10 +321,15 @@ void checkEquals( FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()), FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess); + checkEquals("securityPolicies", this.securityPolicies, securityPolicies); } - private void checkEquals(final String attrName, final Object thisAttr, final Object thatAttr) { + private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) { if (!Objects.equals(thisAttr, thatAttr)) { + if (attrName.equals("password")) { + thisAttr = "****"; + thatAttr = "****"; + } throw new PipeException( String.format( "The existing server with tcp port %s and https port %s's %s %s conflicts to the new %s %s, reject reusing.", diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index 2cf154053afed..fae6430876273 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -21,16 +21,26 @@ import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import java.security.SecureRandom; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; public class PipeSinkTest { @@ -92,4 +102,83 @@ public void testIoTDBThriftAsyncConnectorToOthers() { Assert.fail(); } } + + @Test + public void testOpcUaSink() { + final List schemaList = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.INT64)); + + final Tablet tablet = new Tablet("root.db.d1.vector6", schemaList, 100); + + long timestamp = System.currentTimeMillis(); + for (long row = 0; row < 100; row++) { + final int rowSize = tablet.rowSize; + tablet.addTimestamp(rowSize, timestamp); + for (int i = 0; i < 2; i++) { + tablet.addValue( + schemaList.get(i).getMeasurementId(), rowSize, new SecureRandom().nextLong()); + } + timestamp++; + } + + final List opcSchemaList = + Arrays.asList( + new MeasurementSchema("value1", TSDataType.INT64), + new MeasurementSchema("quality1", TSDataType.BOOLEAN)); + final Tablet qualityTablet = new Tablet("root.db.d1.vector6.s3", opcSchemaList, 100); + + timestamp = System.currentTimeMillis(); + for (long row = 0; row < 100; row++) { + final int rowSize = qualityTablet.rowSize; + qualityTablet.addTimestamp(rowSize, timestamp); + qualityTablet.addValue( + opcSchemaList.get(0).getMeasurementId(), rowSize, new SecureRandom().nextLong()); + qualityTablet.addValue(opcSchemaList.get(1).getMeasurementId(), rowSize, true); + timestamp++; + } + + try (final OpcUaSink qualityOPC = new OpcUaSink(); + final OpcUaSink normalOPC = new OpcUaSink()) { + final PipeTaskRuntimeConfiguration configuration = + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("temp", 0, 1)); + qualityOPC.customize( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY, "true"); + put(PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY, "value1"); + put(PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY, "quality1"); + } + }), + configuration); + normalOPC.customize( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName()); + } + }), + configuration); + final PipeRawTabletInsertionEvent event = + new PipeRawTabletInsertionEvent(tablet, false, "pipe", 0L, null, null, false); + event.increaseReferenceCount(""); + normalOPC.transfer(event); + // Shall not throw + qualityOPC.transfer(event); + event.decreaseReferenceCount("", false); + + qualityOPC.transfer( + new PipeRawTabletInsertionEvent(qualityTablet, false, "pipe", 0L, null, null, false)); + + } catch (Exception e) { + Assert.fail(); + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 1cd9cfb33c758..2eaf6f903de48 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.apache.iotdb.commons.conf.IoTDBConstant.MB; @@ -179,6 +180,57 @@ public class PipeSinkConstant { "sink.opcua.enable-anonymous-access"; public static final boolean CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE = true; + public static final String CONNECTOR_OPC_UA_WITH_QUALITY_KEY = "connector.opcua.with-quality"; + public static final String SINK_OPC_UA_WITH_QUALITY_KEY = "sink.opcua.with-quality"; + public static final boolean CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE = false; + + public static final String CONNECTOR_OPC_UA_VALUE_NAME_KEY = "connector.opcua.value-name"; + public static final String SINK_OPC_UA_VALUE_NAME_KEY = "sink.opcua.value-name"; + public static final String CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE = "value"; + + public static final String CONNECTOR_OPC_UA_QUALITY_NAME_KEY = "connector.opcua.quality-name"; + public static final String SINK_OPC_UA_QUALITY_NAME_KEY = "sink.opcua.quality-name"; + public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = "quality"; + + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY = + "connector.opcua.default-quality"; + public static final String SINK_OPC_UA_DEFAULT_QUALITY_KEY = "sink.opcua.default-quality"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE = "GOOD"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE = "BAD"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE = "UNCERTAIN"; + + public static final String CONNECTOR_OPC_UA_NODE_URL_KEY = "connector.opcua.node-url"; + public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url"; + + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_KEY = + "connector.opcua.security-policy"; + public static final String SINK_OPC_UA_SECURITY_POLICY_KEY = "sink.opcua.security-policy"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE = "NONE"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE = + "BASIC128RSA15"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE = "BASIC256"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE = + "BASIC256SHA256"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE = + "AES128_SHA256_RSAOAEP"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE = + "AES256_SHA256_RSAPSS"; + + public static final List CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES = + Arrays.asList( + CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE, + CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE, + CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE); + + public static final String CONNECTOR_OPC_UA_HISTORIZING_KEY = "connector.opcua.historizing"; + public static final String SINK_OPC_UA_HISTORIZING_KEY = "sink.opcua.historizing"; + public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = false; + + public static final String SINK_OPC_UA_TIMEOUT_SECONDS_KEY = "sink.opcua.timeout-seconds"; + public static final String CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY = + "connector.opcua.timeout-seconds"; + public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L; + public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable"; public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true; diff --git a/pom.xml b/pom.xml index eb27c1694b4d8..e92cace16cf6f 100644 --- a/pom.xml +++ b/pom.xml @@ -410,6 +410,16 @@ checker-qual ${checker-qual.version} + + org.eclipse.milo + stack-client + ${milo.version} + + + org.eclipse.milo + sdk-client + ${milo.version} + io.airlift