From 01dff601a98c07371b665f0809285c0da1dd19fa Mon Sep 17 00:00:00 2001
From: Caideyipi <87789683+Caideyipi@users.noreply.github.com>
Date: Tue, 30 Dec 2025 11:55:04 +0800
Subject: [PATCH] Pipe: Implemented OPC Sink for outer server & Set
configuration and changed the default value of the server security policies &
Made the default quality configurable and does not throw when
non-value/quality measurement is encountered (#16944)
fix
Pipe: Fixed the OPC UA client connection problem (#17083)
* fix
* IT
(cherry picked from commit 82f7ca6dfc54435c8ead7327669c888b1edbeeba)
spt
Optimized the logger when table does not exist in DN heartbeat && Pipe: Fixed the OPC UA Sink key getter logic and potentail NPE when closing client && Load: Fixed the missing schema writing for "root" table (#17063)
* root-fix
* f
* fix
* rest
* spls
* gsa
* fix
(cherry picked from commit 5101489d4126b40120d72d49ee5a58edd23aec22)
fix
Pipe: Implemented OPC Sink for outer server & Set configuration and changed the default value of the server security policies & Made the default quality configurable and does not throw when non-value/quality measurement is encountered (#16944)
* pj
* cj
* bone
* fix
* fix
* framework
* fix
* trilog
* framework
* fix
* fix
* yl
* stack-client
* fix
* might
* sleep-removal
* cleaning
* fix
* sec-dir
* cleaning
* remove-poison
* f
* fix
* clean-sit
* sit-comp
* object
* many-clean
* sit-sit
* fix
* fix
* fix
* ref
* sit
* partial
* security-policies
* check-equals
* check-err
* fix
* compile-fix
* adjust
* ut
* refactor
* fix_and_IT
* fix
* placeholder
* rollback
* eliminate-fault
* pw
* fix
* f
* fix
(cherry picked from commit cb18a95fc014c116d3e0041a753d5493432189bb)
---
integration-test/pom.xml | 1 -
.../pipe/it/single/AbstractPipeSingleIT.java | 2 +-
.../pipe/it/single/IoTDBPipeOPCUAIT.java | 261 ++++++++++++--
.../customizer/parameter/PipeParameters.java | 51 ++-
iotdb-core/datanode/pom.xml | 12 +
.../pipe/sink/protocol/opcua/OpcUaSink.java | 308 +++++++++++++++-
.../protocol/opcua/client/ClientRunner.java | 159 +++++++++
.../client/IoTDBKeyStoreLoaderClient.java | 127 +++++++
.../opcua/client/IoTDBOpcUaClient.java | 335 ++++++++++++++++++
.../{ => server}/OpcUaKeyStoreLoader.java | 2 +-
.../opcua/{ => server}/OpcUaNameSpace.java | 173 ++++++---
.../{ => server}/OpcUaServerBuilder.java | 97 ++---
.../iotdb/db/pipe/sink/PipeSinkTest.java | 89 +++++
.../config/constant/PipeSinkConstant.java | 52 +++
pom.xml | 10 +
15 files changed, 1530 insertions(+), 149 deletions(-)
create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/{ => server}/OpcUaKeyStoreLoader.java (98%)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/{ => server}/OpcUaNameSpace.java (70%)
rename iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/{ => server}/OpcUaServerBuilder.java (82%)
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 8c2f741692900..43b60fc371b23 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -185,7 +185,6 @@
org.bouncycastle
bcprov-jdk18on
- test
junit
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index ea41dc9b579ea..dded7cbce3087 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
@Before
public void setUp() {
- MultiEnvFactory.createEnv(2);
+ MultiEnvFactory.createEnv(1);
env = MultiEnvFactory.getEnv(0);
env.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
// 10 min, assert that the operations will not time out
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 645fbdbaa68bf..599a43476c1fd 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -20,67 +20,280 @@
package org.apache.iotdb.pipe.it.single;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.File;
+import java.net.ConnectException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
@Test
- public void testOPCUASink() throws Exception {
+ public void testOPCUAServerSink() throws Exception {
+ int tcpPort = -1;
+ int httpsPort = -1;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
- TestUtils.executeNonQuery(env, "insert into root.db.d1(time,s1) values (1,1)", null);
+ TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
- final Map connectorAttributes = new HashMap<>();
- connectorAttributes.put("sink", "opc-ua-sink");
- connectorAttributes.put("opcua.model", "client-server");
+ final Map sinkAttributes = new HashMap<>();
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- client
- .createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.emptyMap())
- .setProcessorAttributes(Collections.emptyMap()))
- .getCode());
+ sinkAttributes.put("sink", "opc-ua-sink");
+ sinkAttributes.put("model", "client-server");
+ sinkAttributes.put("opcua.security-policy", "None");
+
+ OpcUaClient opcUaClient;
+ DataValue value;
+ while (true) {
+ final int[] ports = EnvUtils.searchAvailablePorts();
+ tcpPort = ports[0];
+ httpsPort = ports[1];
+ sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("https.port", Integer.toString(httpsPort));
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .createPipe(
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(Collections.singletonMap("user", "root"))
+ .setProcessorAttributes(Collections.emptyMap()))
+ .getCode());
+
+ try {
+ opcUaClient =
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
+ } catch (final PipeException e) {
+ if (e.getCause() instanceof ConnectException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
+ opcUaClient.disconnect().get();
+ break;
+ }
+
+ // Create the region first to avoid tsFile parsing
+ TestUtils.executeNonQueries(
+ env,
+ Arrays.asList(
+ "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
+ "create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)",
+ "create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)",
+ "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)",
+ "insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)",
+ "insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"),
+ null);
+
+ while (true) {
+ final int[] ports = EnvUtils.searchAvailablePorts();
+ tcpPort = ports[0];
+ httpsPort = ports[1];
+ sinkAttributes.put("opcua.tcp.port", Integer.toString(tcpPort));
+ sinkAttributes.put("https.port", Integer.toString(httpsPort));
+ sinkAttributes.put("with-quality", "true");
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("testPipe")
+ .setIsReplaceAllConnectorAttributes(true)
+ .setConnectorAttributes(sinkAttributes)
+ .setProcessorAttributes(Collections.emptyMap())
+ .setExtractorAttributes(Collections.emptyMap()))
+ .getCode());
+ try {
+ opcUaClient =
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + tcpPort + "/iotdb", SecurityPolicy.None, "root", "root");
+ } catch (final PipeException e) {
+ if (e.getCause() instanceof ConnectException) {
+ continue;
+ } else {
+ throw e;
+ }
+ }
+ break;
+ }
+
+ // Test multiple regions
+ TestUtils.executeNonQueries(
+ env,
+ Arrays.asList(
+ "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
+ "insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)",
+ "insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"),
+ null);
+
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
+
+ value =
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1"))
+ .get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
+
+ value =
+ opcUaClient
+ .readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2"))
+ .get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
+ break;
+ } catch (final Throwable t) {
+ if (System.currentTimeMillis() - startTime > 10_000L) {
+ throw t;
+ }
+ }
+ }
+
+ TestUtils.executeNonQuery(
+ env, "insert into root.db.opc(time, quality) values (2, true)", null);
+ TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
+
+ startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
+ Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
+ Assert.assertEquals(new Variant(2.0), value.getValue());
+ Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
+ break;
+ } catch (final Throwable t) {
+ if (System.currentTimeMillis() - startTime > 10_000L) {
+ throw t;
+ }
+ }
+ }
+
+ opcUaClient.disconnect().get();
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
// Test reconstruction
- connectorAttributes.put("password", "test");
+ sinkAttributes.put("password", "test");
+ sinkAttributes.put("security-policy", "basic256sha256");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
+ new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
+ // Banned none, only allows basic256sha256
+ final int finalTcpPort = tcpPort;
+ Assert.assertThrows(
+ PipeException.class,
+ () ->
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:" + finalTcpPort + "/iotdb",
+ SecurityPolicy.None,
+ "root",
+ "root"));
+
// Test conflict
- connectorAttributes.put("password", "conflict");
- Assert.assertEquals(
- TSStatusCode.PIPE_ERROR.getStatusCode(),
- client
- .createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.emptyMap())
- .setProcessorAttributes(Collections.emptyMap()))
- .getCode());
+ sinkAttributes.put("password", "conflict");
+ try {
+ TestUtils.executeNonQuery(
+ env,
+ String.format(
+ "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict@pswd', 'tcp.port'='%s', 'https.port'='%s')",
+ tcpPort, httpsPort),
+ null);
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertEquals(
+ String.format(
+ "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port %s and https port %s's password **** conflicts to the new password ****, reject reusing.",
+ tcpPort, httpsPort),
+ e.getMessage());
+ }
+ } finally {
+ if (tcpPort >= 0) {
+ final String lockPath = EnvUtils.getLockFilePath(tcpPort);
+ if (!new File(lockPath).delete()) {
+ System.out.printf("Delete lock file %s failed%n", lockPath);
+ }
+ }
}
}
+
+ private static OpcUaClient getOpcUaClient(
+ final String nodeUrl,
+ final SecurityPolicy policy,
+ final String userName,
+ final String password) {
+ final IoTDBOpcUaClient client;
+
+ final IdentityProvider provider =
+ Objects.nonNull(userName)
+ ? new UsernameProvider(userName, password)
+ : new AnonymousProvider();
+
+ final String securityDir =
+ CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+ + File.separatorChar
+ + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
+
+ client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
+ new ClientRunner(client, securityDir, password, userName, 10).run();
+ return client.getClient();
+ }
}
diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 3dcb2d19b0a78..b6d59c827f000 100644
--- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -59,7 +59,9 @@ public Map getAttribute() {
}
public boolean hasAttribute(final String key) {
- return attributes.containsKey(key) || attributes.containsKey(KeyReducer.reduce(key));
+ return attributes.containsKey(key)
+ || attributes.containsKey(KeyReducer.shallowReduce(key))
+ || attributes.containsKey(KeyReducer.reduce(key));
}
public boolean hasAnyAttributes(final String... keys) {
@@ -76,7 +78,11 @@ public void addAttribute(final String key, String values) {
}
public String getString(final String key) {
- final String value = attributes.get(key);
+ String value = attributes.get(key);
+ if (Objects.nonNull(value)) {
+ return value;
+ }
+ value = attributes.get(KeyReducer.shallowReduce(key));
return value != null ? value : attributes.get(KeyReducer.reduce(key));
}
@@ -350,29 +356,52 @@ public PipeParameters addOrReplaceEquivalentAttributesWithClone(final PipeParame
}
private static class KeyReducer {
-
- private static final Set PREFIXES = new HashSet<>();
+ private static final Set FIRST_PREFIXES = new HashSet<>();
+ private static final Set SECOND_PREFIXES = new HashSet<>();
static {
- PREFIXES.add("extractor.");
- PREFIXES.add("source.");
- PREFIXES.add("processor.");
- PREFIXES.add("connector.");
- PREFIXES.add("sink.");
+ FIRST_PREFIXES.add("extractor.");
+ FIRST_PREFIXES.add("source.");
+ FIRST_PREFIXES.add("processor.");
+ FIRST_PREFIXES.add("connector.");
+ FIRST_PREFIXES.add("sink.");
+
+ SECOND_PREFIXES.add("opcua.");
}
- static String reduce(final String key) {
+ static String shallowReduce(String key) {
if (key == null) {
return null;
}
final String lowerCaseKey = key.toLowerCase();
- for (final String prefix : PREFIXES) {
+ for (final String prefix : FIRST_PREFIXES) {
if (lowerCaseKey.startsWith(prefix)) {
return key.substring(prefix.length());
}
}
return key;
}
+
+ public static String reduce(String key) {
+ if (key == null) {
+ return null;
+ }
+ String lowerCaseKey = key.toLowerCase();
+ for (final String prefix : FIRST_PREFIXES) {
+ if (lowerCaseKey.startsWith(prefix)) {
+ key = key.substring(prefix.length());
+ lowerCaseKey = lowerCaseKey.substring(prefix.length());
+ break;
+ }
+ }
+ for (final String prefix : SECOND_PREFIXES) {
+ if (lowerCaseKey.startsWith(prefix)) {
+ key = key.substring(prefix.length());
+ break;
+ }
+ }
+ return key;
+ }
}
public static class ValueHider {
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 2a019e1fc436b..175a77e746e92 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -206,6 +206,18 @@
org.checkerframework
checker-qual
+
+ org.eclipse.milo
+ stack-client
+
+
+ org.eclipse.milo
+ sdk-client
+
+
+ org.bouncycastle
+ bcprov-jdk18on
+
org.eclipse.jetty
jetty-http
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 5ce35e750538e..e198506122474 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -22,6 +22,10 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -30,44 +34,86 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data are converted into
@@ -83,9 +129,21 @@ public class OpcUaSink implements PipeConnector {
private static final Map>
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>();
+ private static final Map>
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();
private String serverKey;
- private OpcUaNameSpace nameSpace;
+ private String nodeUrl;
+ private boolean isClientServerModel;
+ private @Nullable String valueName;
+ private @Nullable String qualityName;
+ private StatusCode defaultQuality;
+
+ // Inner server
+ private @Nullable OpcUaNameSpace nameSpace;
+
+ // Outer server
+ private @Nullable IoTDBOpcUaClient client;
@Override
public void validate(final PipeParameterValidator validator) throws Exception {
@@ -104,13 +162,71 @@ public void validate(final PipeParameterValidator validator) throws Exception {
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);
- ;
+
+ final PipeParameters parameters = validator.getParameters();
+ if (validator
+ .getParameters()
+ .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY)
+ || parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
+ CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE)) {
+ validator.validate(
+ CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
+ String.format(
+ "When the OPC UA sink points to an outer server or sets 'with-quality' to true, the %s or %s must be %s.",
+ CONNECTOR_OPC_UA_MODEL_KEY,
+ SINK_OPC_UA_MODEL_KEY,
+ CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+ }
}
@Override
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {
+ final boolean withQuality =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
+ CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
+ valueName =
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY),
+ CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
+ : null;
+ qualityName =
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
+ CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
+ : null;
+ defaultQuality =
+ getQuality(
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, SINK_OPC_UA_DEFAULT_QUALITY_KEY),
+ CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE)
+ : CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE);
+ isClientServerModel =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+ .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
+
+ nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY);
+ if (Objects.isNull(nodeUrl)) {
+ customizeServer(parameters);
+ } else {
+ customizeClient(parameters);
+ }
+ }
+
+ private void customizeServer(final PipeParameters parameters) {
final int tcpBindPort =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -147,6 +263,21 @@ public void customize(
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
+ final Set securityPolicies =
+ (parameters.hasAnyAttributes(
+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY)
+ ? Arrays.stream(
+ parameters
+ .getStringByKeys(
+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY)
+ .replace(" ", "")
+ .split(","))
+ : CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES.stream())
+ .map(this::getSecurityPolicy)
+ .collect(Collectors.toSet());
+ if (securityPolicies.isEmpty()) {
+ throw new PipeException("The security policy cannot be empty.");
+ }
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -165,25 +296,22 @@ public void customize(
.setUser(user)
.setPassword(password)
.setSecurityDir(securityDir)
- .setEnableAnonymousAccess(enableAnonymousAccess);
+ .setEnableAnonymousAccess(enableAnonymousAccess)
+ .setSecurityPolicies(securityPolicies);
final OpcUaServer newServer = builder.build();
- nameSpace =
- new OpcUaNameSpace(
- newServer,
- parameters
- .getStringOrDefault(
- Arrays.asList(
- CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
- CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
- .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
- builder);
+ nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
newServer.startup().get();
return new Pair<>(new AtomicInteger(0), nameSpace);
} else {
oldValue
.getRight()
- .checkEquals(user, password, securityDir, enableAnonymousAccess);
+ .checkEquals(
+ user,
+ password,
+ securityDir,
+ enableAnonymousAccess,
+ securityPolicies);
return oldValue;
}
} catch (final PipeException e) {
@@ -197,6 +325,104 @@ public void customize(
}
}
+ private void customizeClient(final PipeParameters parameters) {
+ final SecurityPolicy policy =
+ getSecurityPolicy(
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY),
+ CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE)
+ .toUpperCase());
+
+ final IdentityProvider provider;
+ final String userName =
+ parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY);
+ final String password =
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY),
+ CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+ provider =
+ Objects.nonNull(userName)
+ ? new UsernameProvider(userName, password)
+ : new AnonymousProvider();
+
+ final String securityDir =
+ IoTDBConfig.addDataHomeDir(
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, SINK_OPC_UA_SECURITY_DIR_KEY),
+ CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+ + File.separatorChar
+ + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
+
+ final long timeoutSeconds =
+ parameters.getLongOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, SINK_OPC_UA_TIMEOUT_SECONDS_KEY),
+ CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE);
+
+ synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+ client =
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
+ .compute(
+ nodeUrl,
+ (key, oldValue) -> {
+ if (Objects.isNull(oldValue)) {
+ final IoTDBOpcUaClient result =
+ new IoTDBOpcUaClient(
+ nodeUrl,
+ policy,
+ provider,
+ parameters.getBooleanOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_HISTORIZING_KEY,
+ SINK_OPC_UA_HISTORIZING_KEY),
+ CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+ final ClientRunner runner =
+ new ClientRunner(result, securityDir, password, userName, timeoutSeconds);
+ runner.run();
+ return new Pair<>(new AtomicInteger(0), result);
+ }
+ oldValue.getRight().checkEquals(userName, password, securityDir, policy);
+ return oldValue;
+ })
+ .getRight();
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet();
+ }
+ }
+
+ private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
+ switch (securityPolicy.toUpperCase()) {
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE:
+ return SecurityPolicy.None;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE:
+ return SecurityPolicy.Basic128Rsa15;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE:
+ return SecurityPolicy.Basic256;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE:
+ return SecurityPolicy.Basic256Sha256;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE:
+ return SecurityPolicy.Aes128_Sha256_RsaOaep;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE:
+ return SecurityPolicy.Aes256_Sha256_RsaPss;
+ default:
+ throw new PipeException(
+ "The security policy can only be 'None', 'Basic128Rsa15', 'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 'Aes256_Sha256_RsaPss'.");
+ }
+ }
+
+ private StatusCode getQuality(final String quality) {
+ switch (quality.toUpperCase()) {
+ case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE:
+ return StatusCode.GOOD;
+ case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE:
+ return StatusCode.BAD;
+ case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE:
+ return StatusCode.UNCERTAIN;
+ default:
+ throw new PipeException("The default quality can only be 'GOOD', 'BAD' or 'UNCERTAIN'.");
+ }
+ }
+
@Override
public void handshake() throws Exception {
// Server side, do nothing
@@ -214,7 +440,19 @@ public void transfer(final Event event) throws Exception {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
- transferByTablet(tabletInsertionEvent, LOGGER, tablet -> nameSpace.transfer(tablet));
+ transferByTablet(
+ tabletInsertionEvent,
+ LOGGER,
+ tablet -> {
+ if (Objects.nonNull(nameSpace)) {
+ nameSpace.transfer(tablet, this);
+ } else if (Objects.nonNull(client)) {
+ client.transfer(tablet, this);
+ } else {
+ throw new PipeException(
+ "No OPC client or server is specified when transferring tablet");
+ }
+ });
}
public static void transferByTablet(
@@ -299,5 +537,45 @@ public void close() throws Exception {
}
}
}
+
+ if (nodeUrl == null) {
+ return;
+ }
+
+ synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
+ final Pair pair =
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl);
+ if (pair == null) {
+ return;
+ }
+
+ if (pair.getLeft().decrementAndGet() <= 0) {
+ try {
+ pair.getRight().disconnect();
+ } finally {
+ CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl);
+ }
+ }
+ }
+ }
+
+ /////////////////////////////// Getter ///////////////////////////////
+
+ public boolean isClientServerModel() {
+ return isClientServerModel;
+ }
+
+ @Nullable
+ public String getValueName() {
+ return valueName;
+ }
+
+ @Nullable
+ public String getQualityName() {
+ return qualityName;
+ }
+
+ public StatusCode getDefaultQuality() {
+ return defaultQuality;
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
new file mode 100644
index 0000000000000..9e3b46d463a17
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+import java.util.Objects;
+
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientRunner {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class);
+
+ static {
+ // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ private final IoTDBOpcUaClient configurableUaClient;
+ private final Path securityDir;
+ private final String password;
+ private final long timeoutSeconds;
+
+ // For conflict checking
+ private final String user;
+
+ public ClientRunner(
+ final IoTDBOpcUaClient configurableUaClient,
+ final String securityDir,
+ final String password,
+ final String user,
+ final long timeoutSeconds) {
+ this.configurableUaClient = configurableUaClient;
+ this.securityDir = Paths.get(securityDir);
+ this.password = password;
+ this.user = user;
+ this.timeoutSeconds = timeoutSeconds;
+ configurableUaClient.setRunner(this);
+ }
+
+ private OpcUaClient createClient() throws Exception {
+ Files.createDirectories(securityDir);
+ if (!Files.exists(securityDir)) {
+ throw new Exception("unable to create security dir: " + securityDir);
+ }
+
+ final File pkiDir = securityDir.resolve("pki").toFile();
+
+ logger.info("security dir: {}", securityDir.toAbsolutePath());
+ logger.info("security pki dir: {}", pkiDir.getAbsolutePath());
+
+ final IoTDBKeyStoreLoaderClient loader =
+ new IoTDBKeyStoreLoaderClient().load(securityDir, password.toCharArray());
+
+ final DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir);
+
+ final DefaultClientCertificateValidator certificateValidator =
+ new DefaultClientCertificateValidator(trustListManager);
+
+ return OpcUaClient.create(
+ configurableUaClient.getNodeUrl(),
+ endpoints -> endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(),
+ configBuilder ->
+ configBuilder
+ .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA client"))
+ .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+ .setKeyPair(loader.getClientKeyPair())
+ .setCertificate(loader.getClientCertificate())
+ .setCertificateChain(loader.getClientCertificateChain())
+ .setCertificateValidator(certificateValidator)
+ .setIdentityProvider(configurableUaClient.getIdentityProvider())
+ .setRequestTimeout(uint(timeoutSeconds * 1000L))
+ .setConnectTimeout(uint(timeoutSeconds * 1000L))
+ .setMaxResponseMessageSize(uint(0))
+ .build());
+ }
+
+ public void run() {
+ try {
+ final OpcUaClient client = createClient();
+
+ try {
+ configurableUaClient.run(client);
+ } catch (final Exception e) {
+ throw new PipeException(
+ "Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ } catch (final Exception e) {
+ throw new PipeException(
+ "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+
+ long getTimeoutSeconds() {
+ return timeoutSeconds;
+ }
+
+ /////////////////////////////// Conflict detection ///////////////////////////////
+
+ void checkEquals(
+ final String user,
+ final String password,
+ final Path securityDir,
+ final SecurityPolicy securityPolicy) {
+ checkEquals("user", this.user, user);
+ checkEquals("password", this.password, password);
+ checkEquals(
+ "security dir",
+ FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
+ FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
+ checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), securityPolicy);
+ }
+
+ private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) {
+ if (!Objects.equals(thisAttr, thatAttr)) {
+ if (attrName.equals("password")) {
+ thisAttr = "****";
+ thatAttr = "****";
+ }
+ throw new PipeException(
+ String.format(
+ "The existing server with nodeUrl %s's %s %s conflicts to the new %s %s, reject reusing.",
+ configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, thatAttr));
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 0000000000000..bfaf378822c31
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class);
+
+ private static final Pattern IP_ADDR_PATTERN =
+ Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+ private static final String CLIENT_ALIAS = "client-ai";
+
+ private X509Certificate[] clientCertificateChain;
+ private X509Certificate clientCertificate;
+ private KeyPair clientKeyPair;
+
+ IoTDBKeyStoreLoaderClient load(final Path baseDir, final char[] password) throws Exception {
+ final KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+ final Path serverKeyStore = baseDir.resolve("iotdb-client.pfx");
+
+ logger.info("Loading KeyStore at {}.", serverKeyStore);
+
+ if (!Files.exists(serverKeyStore)) {
+ keyStore.load(null, password);
+
+ final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+ final SelfSignedCertificateBuilder builder =
+ new SelfSignedCertificateBuilder(keyPair)
+ .setCommonName("Apache IoTDB OPC UA client")
+ .setOrganization("Apache")
+ .setOrganizationalUnit("dev")
+ .setLocalityName("Beijing")
+ .setStateName("China")
+ .setCountryCode("CN")
+ .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+ .addDnsName("localhost")
+ .addIpAddress("127.0.0.1");
+
+ // Get as many hostnames and IP addresses as we can listed in the certificate.
+ for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+ if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+ builder.addIpAddress(hostname);
+ } else {
+ builder.addDnsName(hostname);
+ }
+ }
+
+ final X509Certificate certificate = builder.build();
+
+ keyStore.setKeyEntry(
+ CLIENT_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
+ try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+ keyStore.store(out, password);
+ }
+ } else {
+ try (InputStream in = Files.newInputStream(serverKeyStore)) {
+ keyStore.load(in, password);
+ }
+ }
+
+ final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, password);
+ if (clientPrivateKey instanceof PrivateKey) {
+ clientCertificate = (X509Certificate) keyStore.getCertificate(CLIENT_ALIAS);
+
+ clientCertificateChain =
+ Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+ .map(X509Certificate.class::cast)
+ .toArray(X509Certificate[]::new);
+
+ final PublicKey serverPublicKey = clientCertificate.getPublicKey();
+ clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey);
+ }
+
+ return this;
+ }
+
+ X509Certificate getClientCertificate() {
+ return clientCertificate;
+ }
+
+ public X509Certificate[] getClientCertificateChain() {
+ return clientCertificateChain;
+ }
+
+ KeyPair getClientKeyPair() {
+ return clientKeyPair;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
new file mode 100644
index 0000000000000..2019c0fe83336
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.UaException;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
+
+import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
+import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout;
+
+public class IoTDBOpcUaClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
+
+ // Customized nodes
+ private static final int NAME_SPACE_INDEX = 2;
+
+ // Useless for a server only accept client writing
+ private static final double SAMPLING_INTERVAL_PLACEHOLDER = 500;
+ private final String nodeUrl;
+
+ private final SecurityPolicy securityPolicy;
+ private final IdentityProvider identityProvider;
+ private OpcUaClient client;
+ private final boolean historizing;
+ private ClientRunner runner;
+
+ public IoTDBOpcUaClient(
+ final String nodeUrl,
+ final SecurityPolicy securityPolicy,
+ final IdentityProvider identityProvider,
+ final boolean historizing) {
+ this.nodeUrl = nodeUrl;
+ this.securityPolicy = securityPolicy;
+ this.identityProvider = identityProvider;
+ this.historizing = historizing;
+ }
+
+ public void run(final OpcUaClient client) throws Exception {
+ // synchronous connect
+ this.client = client;
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() * 1000L) {
+ try {
+ client.connect().get();
+ } catch (final ExecutionException e) {
+ if (e.getCause() instanceof UaException
+ && ((UaException) e.getCause()).getStatusCode().getValue() == Bad_Timeout) {
+ Thread.sleep(1000L);
+ continue;
+ }
+ throw e;
+ }
+ break;
+ }
+ }
+
+ // Only support tree model & client-server
+ public void transfer(final Tablet tablet, final OpcUaSink sink) throws Exception {
+ OpcUaNameSpace.transferTabletForClientServerModel(
+ tablet, sink, this::transferTabletRowForClientServerModel);
+ }
+
+ private void transferTabletRowForClientServerModel(
+ final String[] segments,
+ final List measurementSchemas,
+ final List timestamps,
+ final List
+
+ org.eclipse.milo
+ stack-client
+ ${milo.version}
+
+
+ org.eclipse.milo
+ sdk-client
+ ${milo.version}
+
io.airlift