From 3819d7167ff7c9dee00cbaf8c13051f683ce08d9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 14:20:43 +0800 Subject: [PATCH 1/3] Add mutual SSL support for pipe sinks --- .../auto/basic/IoTDBPipeMutualSSLIT.java | 174 ++++++++++++++++++ .../customizer/parameter/PipeParameters.java | 1 + .../parameter/PipeParametersTest.java | 12 ++ .../IoTDBConfigNodeSyncClientManager.java | 4 + .../sink/protocol/IoTDBConfigRegionSink.java | 4 + .../pipe/sink/IoTDBConfigRegionSinkTest.java | 46 +++++ .../IoTDBDataNodeSyncClientManager.java | 4 + .../protocol/legacy/IoTDBLegacyPipeSink.java | 58 +++++- .../async/IoTDBDataRegionAsyncSink.java | 27 ++- .../thrift/sync/IoTDBDataNodeSyncSink.java | 4 + .../iotdb/db/pipe/sink/PipeSinkTest.java | 119 ++++++++++++ .../config/constant/PipeSinkConstant.java | 10 + .../pipe/sink/client/IoTDBSyncClient.java | 17 +- .../sink/client/IoTDBSyncClientManager.java | 10 +- .../pipe/sink/protocol/IoTDBSslSyncSink.java | 62 +++++-- 15 files changed, 523 insertions(+), 29 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java new file mode 100644 index 0000000000000..44507bd9a6938 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.isession.SessionConfig; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeAutoBasic.class}) +public class IoTDBPipeMutualSSLIT extends AbstractPipeDualTreeModelAutoIT { + + private static final String STORE_PASSWORD = "thrift"; + + @Override + protected void setupConfig() { + super.setupConfig(); + + senderEnv + .getConfig() + .getCommonConfig() + .setTrustStorePath(trustStorePath()) + .setTrustStorePwd(STORE_PASSWORD) + .setSslProtocol(SessionConfig.DEFAULT_SSL_PROTOCOL); + receiverEnv + .getConfig() + .getCommonConfig() + .setEnableThriftClientSSL(true) + .setThriftSSLClientAuth(true) + .setKeyStorePath(keyStorePath()) + .setKeyStorePwd(STORE_PASSWORD) + .setTrustStorePath(trustStorePath()) + .setTrustStorePwd(STORE_PASSWORD) + .setSslProtocol(SessionConfig.DEFAULT_SSL_PROTOCOL); + } + + @Test + public void testPipeCanTransferWithMutualSSL() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.pipe_mtls.d1(time, s1) values (1, 11)", + "insert into root.pipe_mtls.d1(time, s1) values (2, 22)", + "flush"), + null); + + final Map sourceAttributes = new HashMap<>(); + final Map sinkAttributes = new HashMap<>(); + + sourceAttributes.put("source.realtime.mode", "log"); + sourceAttributes.put("source.user", "root"); + + sinkAttributes.put("sink", "iotdb-thrift-ssl-sink"); + sinkAttributes.put("sink.batch.enable", "false"); + sinkAttributes.put("sink.ip", receiverDataNode.getIp()); + sinkAttributes.put("sink.port", Integer.toString(receiverDataNode.getPort())); + sinkAttributes.put("sink.ssl.trust-store-path", trustStorePath()); + sinkAttributes.put("sink.ssl.trust-store-pwd", STORE_PASSWORD); + sinkAttributes.put("sink.ssl.key-store-path", keyStorePath()); + sinkAttributes.put("sink.ssl.key-store-pwd", STORE_PASSWORD); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(sourceAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList("insert into root.pipe_mtls.d1(time, s1) values (3, 33)", "flush"), + null); + + try (final Connection connection = + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + + receiverDataNode.getIpAndPortString() + + "?" + + Config.USE_SSL + + "=true&" + + Config.TRUST_STORE + + "=" + + trustStorePath() + + "&" + + Config.TRUST_STORE_PWD + + "=" + + STORE_PASSWORD + + "&" + + Config.KEY_STORE + + "=" + + keyStorePath() + + "&" + + Config.KEY_STORE_PWD + + "=" + + STORE_PASSWORD); + final Statement statement = connection.createStatement()) { + Awaitility.await() + .pollInSameThread() + .pollDelay(1L, TimeUnit.SECONDS) + .pollInterval(1L, TimeUnit.SECONDS) + .atMost(600, TimeUnit.SECONDS) + .untilAsserted( + () -> + TestUtils.assertResultSetEqual( + statement.executeQuery("select s1 from root.pipe_mtls.d1"), + "Time,root.pipe_mtls.d1.s1,", + Collections.unmodifiableSet( + new HashSet<>(Arrays.asList("1,11.0,", "2,22.0,", "3,33.0,"))))); + } + } + } + + private static String keyStorePath() { + return keyDir() + "test-keystore"; + } + + private static String trustStorePath() { + return keyDir() + "test-truststore"; + } + + private static String keyDir() { + return System.getProperty("user.dir") + + File.separator + + "target" + + File.separator + + "test-classes" + + File.separator; + } +} diff --git a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java index 7286d12099161..16a1329a6bfea 100644 --- a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java +++ b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java @@ -430,6 +430,7 @@ public static class ValueHider { static { KEYS.add("ssl.trust-store-pwd"); + KEYS.add("ssl.key-store-pwd"); KEYS.add("scp.password"); KEYS.add("password"); } diff --git a/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java b/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java index d16b8e2879d4a..98eed48eace10 100644 --- a/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java +++ b/iotdb-api/pipe-api/src/test/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParametersTest.java @@ -39,4 +39,16 @@ public void keyReducerTest() { parameters.addAttribute("opcua.sink.value-name", "false"); Assert.assertNull(parameters.getString("value-name")); } + + @Test + public void valueHiderShouldHideSslKeyStorePassword() { + Assert.assertEquals( + "******", PipeParameters.ValueHider.hide("sink.ssl.key-store-pwd", "secret")); + Assert.assertEquals( + "******", PipeParameters.ValueHider.hide("connector.ssl.key-store-pwd", "secret")); + Assert.assertEquals("******", PipeParameters.ValueHider.hide("ssl.key-store-pwd", "secret")); + Assert.assertEquals( + "******", PipeParameters.ValueHider.hide("connector.ssl.trust-store-pwd", "secret")); + Assert.assertEquals("secret", PipeParameters.ValueHider.hide("ssl.key-store-path", "secret")); + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java index c579fad73c123..25c0bc58b6142 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java @@ -39,6 +39,8 @@ public IoTDBConfigNodeSyncClientManager( boolean useSSL, String trustStorePath, String trustStorePwd, + String keyStorePath, + String keyStorePwd, /* The following parameters are used locally. */ String loadBalanceStrategy, /* The following parameters are used to handshake with the receiver. */ @@ -54,6 +56,8 @@ public IoTDBConfigNodeSyncClientManager( useSSL, trustStorePath, trustStorePwd, + keyStorePath, + keyStorePwd, false, loadBalanceStrategy, userEntity, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java index 07e64321de78a..636e28a455e98 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java @@ -68,6 +68,8 @@ protected IoTDBSyncClientManager constructClient( final boolean useSSL, final String trustStorePath, final String trustStorePwd, + final String keyStorePath, + final String keyStorePwd, /* The following parameters are used locally. */ final boolean useLeaderCache, final String loadBalanceStrategy, @@ -84,6 +86,8 @@ protected IoTDBSyncClientManager constructClient( useSSL, Objects.nonNull(trustStorePath) ? ConfigNodeConfig.addHomeDir(trustStorePath) : null, trustStorePwd, + Objects.nonNull(keyStorePath) ? ConfigNodeConfig.addHomeDir(keyStorePath) : null, + keyStorePwd, loadBalanceStrategy, userEntity, password, diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/IoTDBConfigRegionSinkTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/IoTDBConfigRegionSinkTest.java index b88862c4cae08..01b7c1de66839 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/IoTDBConfigRegionSinkTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/IoTDBConfigRegionSinkTest.java @@ -51,4 +51,50 @@ public void testIoTDBSchemaConnector() { Assert.fail(); } } + + @Test + public void testIoTDBConfigRegionSinkAcceptsMutualSslParameters() { + try (IoTDBConfigRegionSink connector = new IoTDBConfigRegionSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, "keypwd"); + } + }))); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testIoTDBConfigRegionSinkRejectsIncompleteKeyStoreParameters() { + try (IoTDBConfigRegionSink connector = new IoTDBConfigRegionSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + } + }))); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY)); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java index 841982accd3e6..df966c78accfa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java @@ -49,6 +49,8 @@ public IoTDBDataNodeSyncClientManager( final boolean useSSL, final String trustStorePath, final String trustStorePwd, + final String keyStorePath, + final String keyStorePwd, /* The following parameters are used locally. */ final boolean useLeaderCache, final String loadBalanceStrategy, @@ -65,6 +67,8 @@ public IoTDBDataNodeSyncClientManager( useSSL, trustStorePath, trustStorePwd, + keyStorePath, + keyStorePwd, useLeaderCache, loadBalanceStrategy, userEntity, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index c0a9eb6a79d65..973cb2f24efc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -81,6 +81,11 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SYNC_CONNECTOR_VERSION_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; @@ -90,6 +95,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SYNC_CONNECTOR_VERSION_KEY; @@ -110,6 +117,8 @@ public class IoTDBLegacyPipeSink implements PipeConnector { private boolean useSSL; private String trustStore; private String trustStorePwd; + private String keyStore; + private String keyStorePwd; private String user; private String password; @@ -151,9 +160,21 @@ public void validate(final PipeParameterValidator validator) throws Exception { SINK_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), - parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), - parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), - parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)); + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), false), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)) + .validate( + args -> (boolean) args[0] == (boolean) args[1], + String.format( + "%s and %s must be specified together", + SINK_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PATH_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY)); } private Set parseNodeUrls(final PipeParameters parameters) { @@ -206,9 +227,21 @@ public void customize( pipeName = configuration.getRuntimeEnvironment().getPipeName(); - useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false); - trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); - trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); + useSSL = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), false); + trustStore = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); + trustStorePwd = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); + keyStore = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PATH_KEY); + keyStorePwd = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY); final DataRegion dataRegion = StorageEngine.getInstance() @@ -233,7 +266,9 @@ public void handshake() throws Exception { port, useSSL, trustStore, - trustStorePwd); + trustStorePwd, + keyStore, + keyStorePwd); openClientSession(); final TSyncIdentityInfo identityInfo = new TSyncIdentityInfo( @@ -252,7 +287,7 @@ public void handshake() throws Exception { String.format(PipeConnectionException.CONNECTION_ERROR_FORMATTER, ipAddress, port), e); } - sessionPool = + final SessionPool.Builder builder = new SessionPool.Builder() .host(ipAddress) .port(port) @@ -261,8 +296,11 @@ public void handshake() throws Exception { .maxSize(1) .useSSL(useSSL) .trustStore(trustStore) - .trustStorePwd(trustStorePwd) - .build(); + .trustStorePwd(trustStorePwd); + if (keyStore != null) { + builder.keyStore(keyStore).keyStorePwd(keyStorePwd); + } + sessionPool = builder.build(); } private void openClientSession() throws TException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index f02f1be367149..de35cdaad2c85 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -93,10 +93,17 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_LEADER_CACHE_ENABLE_KEY; @@ -151,11 +158,23 @@ public void validate(final PipeParameterValidator validator) throws Exception { final PipeParameters parameters = validator.getParameters(); validator.validate( - args -> !((boolean) args[0] || (boolean) args[1] || (boolean) args[2]), + args -> + !((boolean) args[0] + || (boolean) args[1] + || (boolean) args[2] + || (boolean) args[3] + || (boolean) args[4]), "Only 'iotdb-thrift-ssl-sink' supports SSL transmission currently.", - parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), - parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), - parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)); + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), false), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PATH_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY)); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java index 72486d6c4e7cd..ff32bbbb06fb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java @@ -43,6 +43,8 @@ protected IoTDBSyncClientManager constructClient( final boolean useSSL, final String trustStorePath, final String trustStorePwd, + final String keyStorePath, + final String keyStorePwd, /* The following parameters are used locally. */ final boolean useLeaderCache, final String loadBalanceStrategy, @@ -60,6 +62,8 @@ protected IoTDBSyncClientManager constructClient( useSSL, Objects.nonNull(trustStorePath) ? IoTDBConfig.addDataHomeDir(trustStorePath) : null, trustStorePwd, + Objects.nonNull(keyStorePath) ? IoTDBConfig.addDataHomeDir(keyStorePath) : null, + keyStorePwd, useLeaderCache, loadBalanceStrategy, userEntity, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index cf311639ee9a2..6ed4d5b296e68 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -109,6 +109,125 @@ public void testIoTDBThriftAsyncConnectorToOthers() { } } + @Test + public void testIoTDBThriftSyncSslSinkAcceptsMutualSslParameters() { + try (final IoTDBDataRegionSyncSink connector = new IoTDBDataRegionSyncSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.SINK_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName()); + put(PipeSinkConstant.SINK_IOTDB_IP_KEY, "127.0.0.1"); + put(PipeSinkConstant.SINK_IOTDB_PORT_KEY, "6668"); + put(PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore"); + put(PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd"); + put(PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + put(PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY, "keypwd"); + } + }))); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testIoTDBThriftSyncSslConnectorAcceptsConnectorMutualSslAliases() { + try (final IoTDBDataRegionSyncSink connector = new IoTDBDataRegionSyncSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1"); + put(PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY, "6668"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, "keypwd"); + } + }))); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + @Test + public void testIoTDBThriftSyncSslSinkRejectsIncompleteKeyStoreParameters() { + try (final IoTDBDataRegionSyncSink connector = new IoTDBDataRegionSyncSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.SINK_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK.getPipePluginName()); + put(PipeSinkConstant.SINK_IOTDB_IP_KEY, "127.0.0.1"); + put(PipeSinkConstant.SINK_IOTDB_PORT_KEY, "6668"); + put(PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore"); + put(PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd"); + put(PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + } + }))); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains(PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY)); + } + } + + @Test + public void testIoTDBThriftAsyncSinkRejectsSslKeyStoreParameters() { + try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, "keypwd"); + } + }))); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Only 'iotdb-thrift-ssl-sink' supports SSL")); + } + } + + @Test + public void testIoTDBLegacyPipeSinkAcceptsConnectorMutualSslAliases() { + try (final IoTDBLegacyPipeSink connector = new IoTDBLegacyPipeSink()) { + connector.validate( + new PipeParameterValidator( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1"); + put(PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY, "6668"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_ENABLE_KEY, Boolean.TRUE.toString()); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, "truststore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, "trustpwd"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, "keystore"); + put(PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, "keypwd"); + } + }))); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + @Test public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception { try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index 39be064f09a41..aa09b712d6e1c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -50,9 +50,19 @@ public class PipeSinkConstant { public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls"; + public static final String CONNECTOR_IOTDB_SSL_ENABLE_KEY = "connector.ssl.enable"; public static final String SINK_IOTDB_SSL_ENABLE_KEY = "sink.ssl.enable"; + public static final String CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY = + "connector.ssl.trust-store-path"; public static final String SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY = "sink.ssl.trust-store-path"; + public static final String CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY = + "connector.ssl.trust-store-pwd"; public static final String SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY = "sink.ssl.trust-store-pwd"; + public static final String CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY = + "connector.ssl.key-store-path"; + public static final String SINK_IOTDB_SSL_KEY_STORE_PATH_KEY = "sink.ssl.key-store-path"; + public static final String CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY = "connector.ssl.key-store-pwd"; + public static final String SINK_IOTDB_SSL_KEY_STORE_PWD_KEY = "sink.ssl.key-store-pwd"; public static final String CONNECTOR_IOTDB_PARALLEL_TASKS_KEY = "connector.parallel.tasks"; public static final String SINK_IOTDB_PARALLEL_TASKS_KEY = "sink.parallel.tasks"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java index 1ad5d0a855f23..1897fda582a38 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java @@ -54,6 +54,19 @@ public IoTDBSyncClient( String trustStore, String trustStorePwd) throws TTransportException { + this(property, ipAddress, port, useSSL, trustStore, trustStorePwd, null, null); + } + + public IoTDBSyncClient( + ThriftClientProperty property, + String ipAddress, + int port, + boolean useSSL, + String trustStore, + String trustStorePwd, + String keyStore, + String keyStorePwd) + throws TTransportException { super( property .getProtocolFactory() @@ -64,7 +77,9 @@ public IoTDBSyncClient( port, property.getConnectionTimeoutMs(), trustStore, - trustStorePwd) + trustStorePwd, + keyStore, + keyStorePwd) : DeepCopyRpcTransportFactory.INSTANCE.getTransport( ipAddress, port, property.getConnectionTimeoutMs()))); this.ipAddress = ipAddress; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index ff62dbf477b7a..bc95d630358a3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -57,6 +57,8 @@ public abstract class IoTDBSyncClientManager extends IoTDBClientManager implemen private final boolean useSSL; private final String trustStorePath; private final String trustStorePwd; + private final String keyStorePath; + private final String keyStorePwd; protected final Map> endPoint2ClientAndStatus = new ConcurrentHashMap<>(); @@ -71,6 +73,8 @@ protected IoTDBSyncClientManager( boolean useSSL, String trustStorePath, String trustStorePwd, + String keyStorePath, + String keyStorePwd, /* The following parameters are used locally. */ boolean useLeaderCache, String loadBalanceStrategy, @@ -96,6 +100,8 @@ protected IoTDBSyncClientManager( this.useSSL = useSSL; this.trustStorePath = trustStorePath; this.trustStorePwd = trustStorePwd; + this.keyStorePath = keyStorePath; + this.keyStorePwd = keyStorePwd; for (final TEndPoint endPoint : endPoints) { endPoint2ClientAndStatus.put(endPoint, new Pair<>(null, false)); @@ -205,7 +211,9 @@ private boolean initClientAndStatus( endPoint.getPort(), useSSL, trustStorePath, - trustStorePwd)); + trustStorePwd, + keyStorePath, + keyStorePwd)); return true; } catch (Exception e) { endPoint2HandshakeErrorMessage.put(endPoint, e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java index 6d3c05d3b4f21..aee5b6eed1f0b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java @@ -53,10 +53,17 @@ import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR; import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_CONNECTOR; import static org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin.IOTDB_THRIFT_SSL_SINK; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PATH_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_KEY_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_KEY; @@ -90,16 +97,30 @@ public void validate(final PipeParameterValidator validator) throws Exception { IOTDB_THRIFT_CONNECTOR.getPipePluginName()) .toLowerCase(); - validator.validate( - args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), - String.format( - "When ssl transport is enabled, %s and %s must be specified", - SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), - IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName) - || IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName) - || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false), - parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), - parameters.hasAttribute(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)); + validator + .validate( + args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), + String.format( + "When ssl transport is enabled, %s and %s must be specified", + SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY), + IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName) + || IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName) + || parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), + false), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY)) + .validate( + args -> (boolean) args[0] == (boolean) args[1], + String.format( + "%s and %s must be specified together", + SINK_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PATH_KEY), + parameters.hasAnyAttributes( + CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY)); } @Override @@ -118,9 +139,20 @@ public void customize( final boolean useSSL = IOTDB_THRIFT_SSL_CONNECTOR.getPipePluginName().equals(userSpecifiedConnectorName) || IOTDB_THRIFT_SSL_SINK.getPipePluginName().equals(userSpecifiedConnectorName) - || parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false); - final String trustStorePath = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); - final String trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); + || parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SSL_ENABLE_KEY, SINK_IOTDB_SSL_ENABLE_KEY), false); + final String trustStorePath = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PATH_KEY, SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); + final String trustStorePwd = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_TRUST_STORE_PWD_KEY, SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); + final String keyStorePath = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_KEY_STORE_PATH_KEY, SINK_IOTDB_SSL_KEY_STORE_PATH_KEY); + final String keyStorePwd = + parameters.getStringByKeys( + CONNECTOR_IOTDB_SSL_KEY_STORE_PWD_KEY, SINK_IOTDB_SSL_KEY_STORE_PWD_KEY); // leader cache configuration final boolean useLeaderCache = @@ -134,6 +166,8 @@ public void customize( useSSL, trustStorePath, trustStorePwd, + keyStorePath, + keyStorePwd, useLeaderCache, loadBalanceStrategy, userEntity, @@ -150,6 +184,8 @@ protected abstract IoTDBSyncClientManager constructClient( final boolean useSSL, final String trustStorePath, final String trustStorePwd, + final String keyStorePath, + final String keyStorePwd, /* The following parameters are used locally. */ final boolean useLeaderCache, final String loadBalanceStrategy, From 85fad38685fe0dbfc51cc853781d45ca38f91d92 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 15:19:38 +0800 Subject: [PATCH 2/3] Fix JDBC default user handling --- .../src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java index 9a6dc91ceb4fe..7e2ef5c9b06a4 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java @@ -129,7 +129,7 @@ public IoTDBConnection(String url, Properties info) throws SQLException, TTransp } params = Utils.parseUrl(url, info); this.url = url; - this.userName = info.get("user").toString(); + this.userName = params.getUsername(); this.networkTimeout = params.getNetworkTimeout(); this.zoneId = ZoneId.of(params.getTimeZone()); this.charset = params.getCharset(); From eb63781f3b09e4878bae5b82824e68608cab2f04 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 16:33:08 +0800 Subject: [PATCH 3/3] Update IoTDBPipeMutualSSLIT.java --- .../it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java index 44507bd9a6938..6362cca567cc7 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeMutualSSLIT.java @@ -137,7 +137,9 @@ public void testPipeCanTransferWithMutualSSL() throws Exception { + "&" + Config.KEY_STORE_PWD + "=" - + STORE_PASSWORD); + + STORE_PASSWORD, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD); final Statement statement = connection.createStatement()) { Awaitility.await() .pollInSameThread()