diff --git a/fluss-filesystems/fluss-fs-cos/pom.xml b/fluss-filesystems/fluss-fs-cos/pom.xml new file mode 100644 index 0000000000..c3629658d1 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/pom.xml @@ -0,0 +1,250 @@ + + + + + 4.0.0 + + org.apache.fluss + fluss-filesystems + 0.10-SNAPSHOT + + + fluss-fs-cos + Fluss : FileSystems : COS FS + + + 3.3.5 + 5.6.139 + + + + + org.apache.fluss + fluss-common + ${project.version} + provided + + + + org.apache.fluss + fluss-fs-hadoop-shaded + ${project.version} + + + + org.apache.fluss + fluss-fs-hadoop + ${project.version} + + + + org.apache.hadoop + hadoop-cos + ${fs.cosn.sdk.version} + + + com.qcloud + cos_api + + + org.apache.hadoop + hadoop-common + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + + + + com.qcloud + cos_api + ${fs.cos.api.version} + + + javax.xml.bind + jaxb-api + + + + + + + javax.xml.bind + jaxb-api + ${jaxb.api.version} + + provided + + + + org.apache.fluss + fluss-test-utils + + + + org.apache.fluss + fluss-common + ${project.version} + test + test-jar + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + default-jar + + jar + + + + test-jar + + test-jar + + + + + + + + true + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-javax-jars + process-resources + + copy + + + + + + + javax.xml.bind + jaxb-api + ${jaxb.api.version} + jar + true + + + ${project.build.directory}/temporary + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + unpack-javax-libraries + process-resources + + run + + + + + + + + + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-fluss + package + + shade + + + + + *:* + + + javax.servlet:servlet-api + xmlenc:xmlenc + + + + + * + + .gitkeep + mime.types + mozilla/** + LICENSE.txt + license/LICENSE* + okhttp3/internal/publicsuffix/NOTICE + NOTICE + + + + org.apache.fluss:fluss-fs-hadoop + + META-INF/** + + + + + + org.apache.commons + org.apache.fluss.shaded.org.apache.commons + + + + + + + + + + + diff --git a/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystem.java b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystem.java new file mode 100644 index 0000000000..28c5cc771a --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystem.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos; + +import org.apache.fluss.fs.cos.token.COSSecurityTokenProvider; +import org.apache.fluss.fs.hdfs.HadoopFileSystem; +import org.apache.fluss.fs.token.ObtainedSecurityToken; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.io.IOException; + +/** + * A {@link FileSystem} for Tencent Cloud COS that wraps an {@link HadoopFileSystem}, but overwrite + * method to generate access security token. + */ +class COSFileSystem extends HadoopFileSystem { + + private final Configuration conf; + private volatile COSSecurityTokenProvider cosSecurityTokenProvider; + private final String scheme; + + COSFileSystem(FileSystem hadoopFileSystem, String scheme, Configuration conf) { + super(hadoopFileSystem); + this.scheme = scheme; + this.conf = conf; + } + + @Override + public ObtainedSecurityToken obtainSecurityToken() throws IOException { + try { + mayCreateSecurityTokenProvider(); + return cosSecurityTokenProvider.obtainSecurityToken(scheme); + } catch (Exception e) { + throw new IOException(e); + } + } + + private void mayCreateSecurityTokenProvider() throws IOException { + if (cosSecurityTokenProvider == null) { + synchronized (this) { + if (cosSecurityTokenProvider == null) { + cosSecurityTokenProvider = new COSSecurityTokenProvider(conf); + } + } + } + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystemPlugin.java b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystemPlugin.java new file mode 100644 index 0000000000..9418a36113 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystemPlugin.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigBuilder; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FileSystemPlugin; +import org.apache.fluss.fs.cos.token.COSSecurityTokenReceiver; + +import org.apache.hadoop.fs.cosn.CosNFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** Simple factory for the Tencent Cloud COS file system. */ +public class COSFileSystemPlugin implements FileSystemPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(COSFileSystemPlugin.class); + + public static final String SCHEME = "cosn"; + + /** + * In order to simplify, we make fluss cos configuration keys same with hadoop cos module. So, + * we add all configuration key with prefix `fs.cosn` in fluss conf to hadoop conf. + */ + private static final String[] FLUSS_CONFIG_PREFIXES = {"fs.cosn."}; + + public static final String SECRET_ID = "fs.cosn.userinfo.secretId"; + public static final String SECRET_KEY = "fs.cosn.userinfo.secretKey"; + public static final String CREDENTIALS_PROVIDER = "fs.cosn.credentials.provider"; + + public static final String ENDPOINT_KEY = "fs.cosn.endpoint"; + + @Override + public String getScheme() { + return SCHEME; + } + + @Override + public FileSystem create(URI fsUri, Configuration flussConfig) throws IOException { + org.apache.hadoop.conf.Configuration hadoopConfig = getHadoopConfiguration(flussConfig); + + // set credential provider + if (hadoopConfig.get(SECRET_ID) == null) { + String credentialsProvider = hadoopConfig.get(CREDENTIALS_PROVIDER); + if (credentialsProvider != null) { + LOG.info( + "{} is not set, but {} is set, using credential provider {}.", + SECRET_ID, + CREDENTIALS_PROVIDER, + credentialsProvider); + } else { + // no secretId, no credentialsProvider, + // set default credential provider which will get token from + // COSSecurityTokenReceiver + setDefaultCredentialProvider(hadoopConfig); + } + } else { + LOG.info("{} is set, using provided secret id and secret key.", SECRET_ID); + } + + final String scheme = fsUri.getScheme(); + final String authority = fsUri.getAuthority(); + + if (scheme == null && authority == null) { + fsUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig); + } else if (scheme != null && authority == null) { + URI defaultUri = org.apache.hadoop.fs.FileSystem.getDefaultUri(hadoopConfig); + if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) { + fsUri = defaultUri; + } + } + + org.apache.hadoop.fs.FileSystem fileSystem = initFileSystem(fsUri, hadoopConfig); + return new COSFileSystem(fileSystem, getScheme(), hadoopConfig); + } + + protected org.apache.hadoop.fs.FileSystem initFileSystem( + URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig) throws IOException { + CosNFileSystem fileSystem = new CosNFileSystem(); + fileSystem.initialize(fsUri, hadoopConfig); + return fileSystem; + } + + protected void setDefaultCredentialProvider(org.apache.hadoop.conf.Configuration hadoopConfig) { + // use COSSecurityTokenReceiver to update hadoop config to set credentialsProvider + COSSecurityTokenReceiver.updateHadoopConfig(hadoopConfig); + } + + @VisibleForTesting + org.apache.hadoop.conf.Configuration getHadoopConfiguration(Configuration flussConfig) { + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + if (flussConfig == null) { + return conf; + } + + // read all configuration with prefix 'FLUSS_CONFIG_PREFIXES' + for (String key : flussConfig.keySet()) { + for (String prefix : FLUSS_CONFIG_PREFIXES) { + if (key.startsWith(prefix)) { + String value = + flussConfig.getString( + ConfigBuilder.key(key).stringType().noDefaultValue(), null); + conf.set(key, value); + + LOG.debug( + "Adding Fluss config entry for {} as {} to Hadoop config", + key, + conf.get(key)); + } + } + } + return conf; + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenProvider.java b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenProvider.java new file mode 100644 index 0000000000..b0d9491156 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenProvider.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos.token; + +import org.apache.fluss.fs.token.Credentials; +import org.apache.fluss.fs.token.CredentialsJsonSerde; +import org.apache.fluss.fs.token.ObtainedSecurityToken; + +import org.apache.hadoop.conf.Configuration; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.ENDPOINT_KEY; +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.SECRET_ID; +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.SECRET_KEY; + +/** A provider to provide Tencent Cloud COS security token. */ +public class COSSecurityTokenProvider { + + private final String endpoint; + private final String secretId; + private final String secretKey; + + public COSSecurityTokenProvider(Configuration conf) { + endpoint = conf.get(ENDPOINT_KEY); + secretId = conf.get(SECRET_ID); + secretKey = conf.get(SECRET_KEY); + } + + public ObtainedSecurityToken obtainSecurityToken(String scheme) { + // For COS, we directly use the configured secret id and secret key as the token. + // If STS temporary credentials are needed in the future, this can be extended + // to call Tencent Cloud STS API to get temporary credentials. + Map additionInfo = new HashMap<>(); + // we need to put endpoint as addition info + if (endpoint != null) { + additionInfo.put(ENDPOINT_KEY, endpoint); + } + + Credentials credentials = new Credentials(secretId, secretKey, null); + byte[] tokenBytes = CredentialsJsonSerde.toJson(credentials); + + // token does not expire when using static credentials + return new ObtainedSecurityToken(scheme, tokenBytes, Long.MAX_VALUE, additionInfo); + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenReceiver.java b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenReceiver.java new file mode 100644 index 0000000000..63544fdf58 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenReceiver.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos.token; + +import org.apache.fluss.fs.cos.COSFileSystemPlugin; +import org.apache.fluss.fs.token.CredentialsJsonSerde; +import org.apache.fluss.fs.token.ObtainedSecurityToken; +import org.apache.fluss.fs.token.SecurityTokenReceiver; + +import com.qcloud.cos.auth.BasicCOSCredentials; +import com.qcloud.cos.auth.BasicSessionCredentials; +import com.qcloud.cos.auth.COSCredentials; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.CREDENTIALS_PROVIDER; + +/** Security token receiver for Tencent Cloud COS filesystem. */ +public class COSSecurityTokenReceiver implements SecurityTokenReceiver { + + private static final Logger LOG = LoggerFactory.getLogger(COSSecurityTokenReceiver.class); + + static volatile COSCredentials credentials; + static volatile Map additionInfos; + + public static void updateHadoopConfig(org.apache.hadoop.conf.Configuration hadoopConfig) { + updateHadoopConfig(hadoopConfig, DynamicTemporaryCOSCredentialsProvider.NAME); + } + + protected static void updateHadoopConfig( + org.apache.hadoop.conf.Configuration hadoopConfig, String credentialsProviderName) { + LOG.info("Updating Hadoop configuration"); + + String providers = hadoopConfig.get(CREDENTIALS_PROVIDER, ""); + + if (!providers.contains(credentialsProviderName)) { + if (providers.isEmpty()) { + LOG.debug("Setting provider"); + providers = credentialsProviderName; + } else { + providers = credentialsProviderName + "," + providers; + LOG.debug("Prepending provider, new providers value: {}", providers); + } + hadoopConfig.set(CREDENTIALS_PROVIDER, providers); + } else { + LOG.debug("Provider already exists"); + } + + // then, set addition info + if (additionInfos == null) { + // if addition info is null, it also means we have not received any token, + throw new RuntimeException( + "COS credentials have not been received yet. " + + "Ensure onNewTokensObtained() has been called with valid tokens " + + "before invoking COSSecurityTokenReceiver.updateHadoopConfig()."); + } else { + for (Map.Entry entry : additionInfos.entrySet()) { + hadoopConfig.set(entry.getKey(), entry.getValue()); + } + } + + LOG.info("Updated Hadoop configuration successfully"); + } + + @Override + public String scheme() { + return COSFileSystemPlugin.SCHEME; + } + + @Override + public void onNewTokensObtained(ObtainedSecurityToken token) { + LOG.info("Updating session credentials"); + + byte[] tokenBytes = token.getToken(); + + org.apache.fluss.fs.token.Credentials flussCredentials = + CredentialsJsonSerde.fromJson(tokenBytes); + + // Create COSCredentials from fluss credentials, distinguishing between + // static credentials (no session token) and temporary session credentials + if (flussCredentials.getSecurityToken() != null) { + credentials = + new BasicSessionCredentials( + flussCredentials.getAccessKeyId(), + flussCredentials.getSecretAccessKey(), + flussCredentials.getSecurityToken()); + } else { + credentials = + new BasicCOSCredentials( + flussCredentials.getAccessKeyId(), + flussCredentials.getSecretAccessKey()); + } + additionInfos = token.getAdditionInfos(); + + // Consistent with S3DelegationTokenReceiver, logging access key at INFO level + LOG.info( + "Session credentials updated successfully with access key: {}.", + credentials.getCOSAccessKeyId()); + } + + public static COSCredentials getCredentials() { + return credentials; + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/DynamicTemporaryCOSCredentialsProvider.java b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/DynamicTemporaryCOSCredentialsProvider.java new file mode 100644 index 0000000000..71d3e8af8b --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/DynamicTemporaryCOSCredentialsProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos.token; + +import org.apache.fluss.annotation.Internal; + +import com.qcloud.cos.auth.BasicSessionCredentials; +import com.qcloud.cos.auth.COSCredentials; +import com.qcloud.cos.auth.COSCredentialsProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Support dynamic credentials for authenticating with Tencent Cloud COS. It'll get credentials from + * {@link COSSecurityTokenReceiver}. It implements COS native {@link COSCredentialsProvider} to work + * with CosNFileSystem. + * + *

This provider supports both static credentials (secretId/secretKey) and temporary session + * credentials (secretId/secretKey/sessionToken). + */ +@Internal +public class DynamicTemporaryCOSCredentialsProvider implements COSCredentialsProvider { + + private static final Logger LOG = + LoggerFactory.getLogger(DynamicTemporaryCOSCredentialsProvider.class); + + public static final String NAME = DynamicTemporaryCOSCredentialsProvider.class.getName(); + + @Override + public COSCredentials getCredentials() { + COSCredentials credentials = COSSecurityTokenReceiver.getCredentials(); + if (credentials == null) { + throw new RuntimeException( + "COS credentials have not been received yet. " + + "Ensure COSSecurityTokenReceiver has received valid tokens."); + } + if (credentials instanceof BasicSessionCredentials) { + BasicSessionCredentials sessionCredentials = (BasicSessionCredentials) credentials; + LOG.debug("Providing session credentials"); + return new BasicSessionCredentials( + sessionCredentials.getCOSAccessKeyId(), + sessionCredentials.getCOSSecretKey(), + sessionCredentials.getSessionToken()); + } else { + LOG.debug("Providing non-session COS credentials"); + return credentials; + } + } + + @Override + public void refresh() { + // do nothing, credentials are updated by COSSecurityTokenReceiver + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/NOTICE b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..3f53868029 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/NOTICE @@ -0,0 +1,70 @@ +fluss-fs-cos +Copyright 2025-2026 The Apache Software Foundation + +This project includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) + +- com.fasterxml.jackson.core:jackson-annotations:2.15.3 +- com.fasterxml.jackson.core:jackson-core:2.15.3 +- com.fasterxml.jackson.core:jackson-databind:2.15.3 +- com.fasterxml.woodstox:woodstox-core:5.4.0 +- com.google.code.gson:gson:2.2.4 +- com.google.guava:failureaccess:1.0 +- com.google.guava:guava:27.0-jre +- com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava +- com.google.j2objc:j2objc-annotations:1.1 +- com.google.re2j:re2j:1.1 +- com.qcloud:cos_api:5.6.139 +- com.qcloud:cos_api-bundle:5.6.69 +- com.squareup.okhttp:logging-interceptor:2.7.5 +- com.squareup.okhttp:okhttp:2.7.5 +- com.squareup.okio:okio:1.12.0 +- com.tencentcloudapi:tencentcloud-sdk-java-common:3.1.213 +- com.tencentcloudapi:tencentcloud-sdk-java-kms:3.1.213 +- commons-beanutils:commons-beanutils:1.9.4 +- commons-codec:commons-codec:1.13 +- commons-collections:commons-collections:3.2.2 +- commons-io:commons-io:2.14.0 +- commons-logging:commons-logging:1.2 +- dnsjava:dnsjava:3.4.0 +- io.dropwizard.metrics:metrics-core:3.2.4 +- io.netty:netty-buffer:4.1.100.Final +- io.netty:netty-codec:4.1.100.Final +- io.netty:netty-common:4.1.100.Final +- io.netty:netty-handler:4.1.100.Final +- io.netty:netty-resolver:4.1.100.Final +- io.netty:netty-transport-classes-epoll:4.1.100.Final +- io.netty:netty-transport-native-epoll:4.1.100.Final +- io.netty:netty-transport-native-unix-common:4.1.100.Final +- io.netty:netty-transport:4.1.100.Final +- jakarta.activation:jakarta.activation-api:1.2.1 +- joda-time:joda-time:2.9.9 +- org.apache.commons:commons-compress:1.24.0 +- org.apache.commons:commons-configuration2:2.8.0 +- org.apache.commons:commons-lang3:3.18.0 +- org.apache.commons:commons-text:1.10.0 +- org.apache.hadoop:hadoop-annotations:3.4.0 +- org.apache.hadoop:hadoop-auth:3.4.0 +- org.apache.hadoop:hadoop-common:3.4.0 +- org.apache.hadoop:hadoop-cos:3.3.5 +- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0 +- org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_21:1.2.0 +- org.apache.httpcomponents:httpclient:4.5.13 +- org.apache.httpcomponents:httpcore:4.4.13 +- org.apache.kerby:kerb-core:2.0.3 +- org.apache.kerby:kerby-asn1:2.0.3 +- org.apache.kerby:kerby-pkix:2.0.3 +- org.apache.kerby:kerby-util:2.0.3 +- org.bouncycastle:bcprov-jdk15on:1.67 +- org.checkerframework:checker-qual:2.5.2 +- org.codehaus.jettison:jettison:1.5.4 +- org.codehaus.mojo:animal-sniffer-annotations:1.17 +- org.codehaus.woodstox:stax2-api:4.2.1 +- org.xerial.snappy:snappy-java:1.1.10.4 + +This project bundles the following dependencies under the CDDL 1.1 license. +See bundled license files for details. + +- javax.xml.bind:jaxb-api:2.3.1 diff --git a/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/licenses/LICENSE.jaxb b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/licenses/LICENSE.jaxb new file mode 100644 index 0000000000..fd16ea9546 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/licenses/LICENSE.jaxb @@ -0,0 +1,135 @@ +COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL)Version 1.1 + +1. Definitions. + + 1.1. "Contributor" means each individual or entity that creates or contributes to the creation of Modifications. + + 1.2. "Contributor Version" means the combination of the Original Software, prior Modifications used by a Contributor (if any), and the Modifications made by that particular Contributor. + + 1.3. "Covered Software" means (a) the Original Software, or (b) Modifications, or (c) the combination of files containing Original Software with files containing Modifications, in each case including portions thereof. + + 1.4. "Executable" means the Covered Software in any form other than Source Code. + + 1.5. "Initial Developer" means the individual or entity that first makes Original Software available under this License. + + 1.6. "Larger Work" means a work which combines Covered Software or portions thereof with code not governed by the terms of this License. + + 1.7. "License" means this document. + + 1.8. "Licensable" means having the right to grant, to the maximum extent possible, whether at the time of the initial grant or subsequently acquired, any and all of the rights conveyed herein. + + 1.9. "Modifications" means the Source Code and Executable form of any of the following: + + A. Any file that results from an addition to, deletion from or modification of the contents of a file containing Original Software or previous Modifications; + + B. Any new file that contains any part of the Original Software or previous Modification; or + + C. Any new file that is contributed or otherwise made available under the terms of this License. + + 1.10. "Original Software" means the Source Code and Executable form of computer software code that is originally released under this License. + + 1.11. "Patent Claims" means any patent claim(s), now owned or hereafter acquired, including without limitation, method, process, and apparatus claims, in any patent Licensable by grantor. + + 1.12. "Source Code" means (a) the common form of computer software code in which modifications are made and (b) associated documentation included in or with such code. + + 1.13. "You" (or "Your") means an individual or a legal entity exercising rights under, and complying with all of the terms of, this License. For legal entities, "You" includes any entity which controls, is controlled by, or is under common control with You. For purposes of this definition, "control" means (a) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (b) ownership of more than fifty percent (50%) of the outstanding shares or beneficial ownership of such entity. + +2. License Grants. + + 2.1. The Initial Developer Grant. + + Conditioned upon Your compliance with Section 3.1 below and subject to third party intellectual property claims, the Initial Developer hereby grants You a world-wide, royalty-free, non-exclusive license: + + (a) under intellectual property rights (other than patent or trademark) Licensable by Initial Developer, to use, reproduce, modify, display, perform, sublicense and distribute the Original Software (or portions thereof), with or without Modifications, and/or as part of a Larger Work; and + + (b) under Patent Claims infringed by the making, using or selling of Original Software, to make, have made, use, practice, sell, and offer for sale, and/or otherwise dispose of the Original Software (or portions thereof). + + (c) The licenses granted in Sections 2.1(a) and (b) are effective on the date Initial Developer first distributes or otherwise makes the Original Software available to a third party under the terms of this License. + + (d) Notwithstanding Section 2.1(b) above, no patent license is granted: (1) for code that You delete from the Original Software, or (2) for infringements caused by: (i) the modification of the Original Software, or (ii) the combination of the Original Software with other software or devices. + + 2.2. Contributor Grant. + + Conditioned upon Your compliance with Section 3.1 below and subject to third party intellectual property claims, each Contributor hereby grants You a world-wide, royalty-free, non-exclusive license: + + (a) under intellectual property rights (other than patent or trademark) Licensable by Contributor to use, reproduce, modify, display, perform, sublicense and distribute the Modifications created by such Contributor (or portions thereof), either on an unmodified basis, with other Modifications, as Covered Software and/or as part of a Larger Work; and + + (b) under Patent Claims infringed by the making, using, or selling of Modifications made by that Contributor either alone and/or in combination with its Contributor Version (or portions of such combination), to make, use, sell, offer for sale, have made, and/or otherwise dispose of: (1) Modifications made by that Contributor (or portions thereof); and (2) the combination of Modifications made by that Contributor with its Contributor Version (or portions of such combination). + + (c) The licenses granted in Sections 2.2(a) and 2.2(b) are effective on the date Contributor first distributes or otherwise makes the Modifications available to a third party. + + (d) Notwithstanding Section 2.2(b) above, no patent license is granted: (1) for any code that Contributor has deleted from the Contributor Version; (2) for infringements caused by: (i) third party modifications of Contributor Version, or (ii) the combination of Modifications made by that Contributor with other software (except as part of the Contributor Version) or other devices; or (3) under Patent Claims infringed by Covered Software in the absence of Modifications made by that Contributor. + +3. Distribution Obligations. + + 3.1. Availability of Source Code. + + Any Covered Software that You distribute or otherwise make available in Executable form must also be made available in Source Code form and that Source Code form must be distributed only under the terms of this License. You must include a copy of this License with every copy of the Source Code form of the Covered Software You distribute or otherwise make available. You must inform recipients of any such Covered Software in Executable form as to how they can obtain such Covered Software in Source Code form in a reasonable manner on or through a medium customarily used for software exchange. + + 3.2. Modifications. + + The Modifications that You create or to which You contribute are governed by the terms of this License. You represent that You believe Your Modifications are Your original creation(s) and/or You have sufficient rights to grant the rights conveyed by this License. + + 3.3. Required Notices. + + You must include a notice in each of Your Modifications that identifies You as the Contributor of the Modification. You may not remove or alter any copyright, patent or trademark notices contained within the Covered Software, or any notices of licensing or any descriptive text giving attribution to any Contributor or the Initial Developer. + + 3.4. Application of Additional Terms. + + You may not offer or impose any terms on any Covered Software in Source Code form that alters or restricts the applicable version of this License or the recipients' rights hereunder. You may choose to offer, and to charge a fee for, warranty, support, indemnity or liability obligations to one or more recipients of Covered Software. However, you may do so only on Your own behalf, and not on behalf of the Initial Developer or any Contributor. You must make it absolutely clear that any such warranty, support, indemnity or liability obligation is offered by You alone, and You hereby agree to indemnify the Initial Developer and every Contributor for any liability incurred by the Initial Developer or such Contributor as a result of warranty, support, indemnity or liability terms You offer. + + 3.5. Distribution of Executable Versions. + + You may distribute the Executable form of the Covered Software under the terms of this License or under the terms of a license of Your choice, which may contain terms different from this License, provided that You are in compliance with the terms of this License and that the license for the Executable form does not attempt to limit or alter the recipient's rights in the Source Code form from the rights set forth in this License. If You distribute the Covered Software in Executable form under a different license, You must make it absolutely clear that any terms which differ from this License are offered by You alone, not by the Initial Developer or Contributor. You hereby agree to indemnify the Initial Developer and every Contributor for any liability incurred by the Initial Developer or such Contributor as a result of any such terms You offer. + + 3.6. Larger Works. + + You may create a Larger Work by combining Covered Software with other code not governed by the terms of this License and distribute the Larger Work as a single product. In such a case, You must make sure the requirements of this License are fulfilled for the Covered Software. + +4. Versions of the License. + + 4.1. New Versions. + + Oracle is the initial license steward and may publish revised and/or new versions of this License from time to time. Each version will be given a distinguishing version number. Except as provided in Section 4.3, no one other than the license steward has the right to modify this License. + + 4.2. Effect of New Versions. + + You may always continue to use, distribute or otherwise make the Covered Software available under the terms of the version of the License under which You originally received the Covered Software. If the Initial Developer includes a notice in the Original Software prohibiting it from being distributed or otherwise made available under any subsequent version of the License, You must distribute and make the Covered Software available under the terms of the version of the License under which You originally received the Covered Software. Otherwise, You may also choose to use, distribute or otherwise make the Covered Software available under the terms of any subsequent version of the License published by the license steward. + + 4.3. Modified Versions. + + When You are an Initial Developer and You want to create a new license for Your Original Software, You may create and use a modified version of this License if You: (a) rename the license and remove any references to the name of the license steward (except to note that the license differs from this License); and (b) otherwise make it clear that the license contains terms which differ from this License. + +5. DISCLAIMER OF WARRANTY. + + COVERED SOFTWARE IS PROVIDED UNDER THIS LICENSE ON AN "AS IS" BASIS, WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, WITHOUT LIMITATION, WARRANTIES THAT THE COVERED SOFTWARE IS FREE OF DEFECTS, MERCHANTABLE, FIT FOR A PARTICULAR PURPOSE OR NON-INFRINGING. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE COVERED SOFTWARE IS WITH YOU. SHOULD ANY COVERED SOFTWARE PROVE DEFECTIVE IN ANY RESPECT, YOU (NOT THE INITIAL DEVELOPER OR ANY OTHER CONTRIBUTOR) ASSUME THE COST OF ANY NECESSARY SERVICING, REPAIR OR CORRECTION. THIS DISCLAIMER OF WARRANTY CONSTITUTES AN ESSENTIAL PART OF THIS LICENSE. NO USE OF ANY COVERED SOFTWARE IS AUTHORIZED HEREUNDER EXCEPT UNDER THIS DISCLAIMER. + +6. TERMINATION. + + 6.1. This License and the rights granted hereunder will terminate automatically if You fail to comply with terms herein and fail to cure such breach within 30 days of becoming aware of the breach. Provisions which, by their nature, must remain in effect beyond the termination of this License shall survive. + + 6.2. If You assert a patent infringement claim (excluding declaratory judgment actions) against Initial Developer or a Contributor (the Initial Developer or Contributor against whom You assert such claim is referred to as "Participant") alleging that the Participant Software (meaning the Contributor Version where the Participant is a Contributor or the Original Software where the Participant is the Initial Developer) directly or indirectly infringes any patent, then any and all rights granted directly or indirectly to You by such Participant, the Initial Developer (if the Initial Developer is not the Participant) and all Contributors under Sections 2.1 and/or 2.2 of this License shall, upon 60 days notice from Participant terminate prospectively and automatically at the expiration of such 60 day notice period, unless if within such 60 day period You withdraw Your claim with respect to the Participant Software against such Participant either unilaterally or pursuant to a written agreement with Participant. + + 6.3. If You assert a patent infringement claim against Participant alleging that the Participant Software directly or indirectly infringes any patent where such claim is resolved (such as by license or settlement) prior to the initiation of patent infringement litigation, then the reasonable value of the licenses granted by such Participant under Sections 2.1 or 2.2 shall be taken into account in determining the amount or value of any payment or license. + + 6.4. In the event of termination under Sections 6.1 or 6.2 above, all end user licenses that have been validly granted by You or any distributor hereunder prior to termination (excluding licenses granted to You by any distributor) shall survive termination. + +7. LIMITATION OF LIABILITY. + + UNDER NO CIRCUMSTANCES AND UNDER NO LEGAL THEORY, WHETHER TORT (INCLUDING NEGLIGENCE), CONTRACT, OR OTHERWISE, SHALL YOU, THE INITIAL DEVELOPER, ANY OTHER CONTRIBUTOR, OR ANY DISTRIBUTOR OF COVERED SOFTWARE, OR ANY SUPPLIER OF ANY OF SUCH PARTIES, BE LIABLE TO ANY PERSON FOR ANY INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES OF ANY CHARACTER INCLUDING, WITHOUT LIMITATION, DAMAGES FOR LOSS OF GOODWILL, WORK STOPPAGE, COMPUTER FAILURE OR MALFUNCTION, OR ANY AND ALL OTHER COMMERCIAL DAMAGES OR LOSSES, EVEN IF SUCH PARTY SHALL HAVE BEEN INFORMED OF THE POSSIBILITY OF SUCH DAMAGES. THIS LIMITATION OF LIABILITY SHALL NOT APPLY TO LIABILITY FOR DEATH OR PERSONAL INJURY RESULTING FROM SUCH PARTY'S NEGLIGENCE TO THE EXTENT APPLICABLE LAW PROHIBITS SUCH LIMITATION. SOME JURISDICTIONS DO NOT ALLOW THE EXCLUSION OR LIMITATION OF INCIDENTAL OR CONSEQUENTIAL DAMAGES, SO THIS EXCLUSION AND LIMITATION MAY NOT APPLY TO YOU. + +8. U.S. GOVERNMENT END USERS. + + The Covered Software is a "commercial item," as that term is defined in 48 C.F.R. 2.101 (Oct. 1995), consisting of "commercial computer software" (as that term is defined at 48 C.F.R. ? 252.227-7014(a)(1)) and "commercial computer software documentation" as such terms are used in 48 C.F.R. 12.212 (Sept. 1995). Consistent with 48 C.F.R. 12.212 and 48 C.F.R. 227.7202-1 through 227.7202-4 (June 1995), all U.S. Government End Users acquire Covered Software with only those rights set forth herein. This U.S. Government Rights clause is in lieu of, and supersedes, any other FAR, DFAR, or other clause or provision that addresses Government rights in computer software under this License. + +9. MISCELLANEOUS. + + This License represents the complete agreement concerning subject matter hereof. If any provision of this License is held to be unenforceable, such provision shall be reformed only to the extent necessary to make it enforceable. This License shall be governed by the law of the jurisdiction specified in a notice contained within the Original Software (except to the extent applicable law, if any, provides otherwise), excluding such jurisdiction's conflict-of-law provisions. Any litigation relating to this License shall be subject to the jurisdiction of the courts located in the jurisdiction and venue specified in a notice contained within the Original Software, with the losing party responsible for costs, including, without limitation, court costs and reasonable attorneys' fees and expenses. The application of the United Nations Convention on Contracts for the International Sale of Goods is expressly excluded. Any law or regulation which provides that the language of a contract shall be construed against the drafter shall not apply to this License. You agree that You alone are responsible for compliance with the United States export administration regulations (and the export control laws and regulation of any other countries) when You use, distribute or otherwise make available any Covered Software. + +10. RESPONSIBILITY FOR CLAIMS. + + As between Initial Developer and the Contributors, each party is responsible for claims and damages arising, directly or indirectly, out of its utilization of rights under this License and You agree to work with Initial Developer and Contributors to distribute such responsibility on an equitable basis. Nothing herein is intended or shall be deemed to constitute any admission of liability. + +---------- +NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) +The code released under the CDDL shall be governed by the laws of the State of California (excluding conflict-of-law provisions). Any litigation relating to this License shall be subject to the jurisdiction of the Federal Courts of the Northern District of California and the state courts of the State of California, with venue lying in Santa Clara County, California. diff --git a/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin new file mode 100644 index 0000000000..3919000bba --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.fluss.fs.cos.COSFileSystemPlugin diff --git a/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver new file mode 100644 index 0000000000..262c317c8a --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.fluss.fs.cos.token.COSSecurityTokenReceiver diff --git a/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSFileSystemBehaviorITCase.java b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSFileSystemBehaviorITCase.java new file mode 100644 index 0000000000..05977a913d --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSFileSystemBehaviorITCase.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FileSystemBehaviorTestSuite; +import org.apache.fluss.fs.FsPath; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.util.UUID; + +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.ENDPOINT_KEY; +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.SECRET_ID; +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.SECRET_KEY; + +/** + * An implementation of the {@link FileSystemBehaviorTestSuite} for the COS file system with Hadoop + * cos sdk. + */ +class COSFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite { + + private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID(); + + @BeforeAll + static void setup() { + COSTestCredentials.assumeCredentialsAvailable(); + + final Configuration conf = new Configuration(); + conf.setString(ENDPOINT_KEY, COSTestCredentials.getCOSEndpoint()); + conf.setString(SECRET_ID, COSTestCredentials.getCOSSecretId()); + conf.setString(SECRET_KEY, COSTestCredentials.getCOSSecretKey()); + FileSystem.initialize(conf, null); + } + + @Override + protected FileSystem getFileSystem() throws Exception { + return getBasePath().getFileSystem(); + } + + @Override + protected FsPath getBasePath() { + return new FsPath(COSTestCredentials.getTestBucketUri() + TEST_DATA_DIR); + } + + @AfterAll + static void clearFsConfig() { + FileSystem.initialize(new Configuration(), null); + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSTestCredentials.java b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSTestCredentials.java new file mode 100644 index 0000000000..b00a9b906f --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSTestCredentials.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos; + +import javax.annotation.Nullable; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Access to credentials to access COS buckets during integration tests. */ +public class COSTestCredentials { + @Nullable private static final String ENDPOINT = System.getenv("COSN_ENDPOINT"); + + @Nullable private static final String BUCKET = System.getenv("COSN_BUCKET"); + + @Nullable private static final String SECRET_ID = System.getenv("COSN_SECRET_ID"); + + @Nullable private static final String SECRET_KEY = System.getenv("COSN_SECRET_KEY"); + + // ------------------------------------------------------------------------ + + public static boolean credentialsAvailable() { + return isNotEmpty(ENDPOINT) + && isNotEmpty(BUCKET) + && isNotEmpty(SECRET_ID) + && isNotEmpty(SECRET_KEY); + } + + /** Checks if a String is not null and not empty. */ + private static boolean isNotEmpty(@Nullable String str) { + return str != null && !str.isEmpty(); + } + + public static void assumeCredentialsAvailable() { + assumeTrue( + credentialsAvailable(), "No COS credentials available in this test's environment"); + } + + /** + * Get COS endpoint used to connect. + * + * @return COS endpoint + */ + public static String getCOSEndpoint() { + if (ENDPOINT != null) { + return ENDPOINT; + } else { + throw new IllegalStateException("COS endpoint is not available"); + } + } + + /** + * Get COS secret id. + * + * @return COS secret id + */ + public static String getCOSSecretId() { + if (SECRET_ID != null) { + return SECRET_ID; + } else { + throw new IllegalStateException("COS secret id is not available"); + } + } + + /** + * Get COS secret key. + * + * @return COS secret key + */ + public static String getCOSSecretKey() { + if (SECRET_KEY != null) { + return SECRET_KEY; + } else { + throw new IllegalStateException("COS secret key is not available"); + } + } + + public static String getTestBucketUri() { + return getTestBucketUriWithScheme("cosn"); + } + + public static String getTestBucketUriWithScheme(String scheme) { + if (BUCKET != null) { + return scheme + "://" + BUCKET + "/"; + } else { + throw new IllegalStateException("COS test bucket is not available"); + } + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorBaseITCase.java b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorBaseITCase.java new file mode 100644 index 0000000000..446ec04f84 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorBaseITCase.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FileSystemBehaviorTestSuite; + +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.ENDPOINT_KEY; +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.SECRET_ID; +import static org.apache.fluss.fs.cos.COSFileSystemPlugin.SECRET_KEY; + +/** Base IT case for access COS with temporary credentials in hadoop sdk as COS FileSystem. */ +abstract class COSWithTokenFileSystemBehaviorBaseITCase extends FileSystemBehaviorTestSuite { + + static void initFileSystemWithSecretKey() { + COSTestCredentials.assumeCredentialsAvailable(); + + // first init filesystem with secretId/secretKey + final Configuration conf = new Configuration(); + conf.setString(ENDPOINT_KEY, COSTestCredentials.getCOSEndpoint()); + conf.setString(SECRET_ID, COSTestCredentials.getCOSSecretId()); + conf.setString(SECRET_KEY, COSTestCredentials.getCOSSecretKey()); + FileSystem.initialize(conf, null); + } +} diff --git a/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorITCase.java b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorITCase.java new file mode 100644 index 0000000000..c5dd30c863 --- /dev/null +++ b/fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorITCase.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.fs.cos; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.fs.cos.token.COSSecurityTokenReceiver; +import org.apache.fluss.fs.token.ObtainedSecurityToken; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.util.UUID; + +/** IT case for access COS with security token in hadoop sdk as FileSystem. */ +class COSWithTokenFileSystemBehaviorITCase extends COSWithTokenFileSystemBehaviorBaseITCase { + + private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID(); + + @BeforeAll + static void setup() throws Exception { + // init a filesystem with secretId/secretKey so that it can generate token + initFileSystemWithSecretKey(); + // now, we can init with token + initFileSystemWithToken(getFsPath()); + } + + @Override + protected FileSystem getFileSystem() throws Exception { + return getFsPath().getFileSystem(); + } + + @Override + protected FsPath getBasePath() { + return getFsPath(); + } + + protected static FsPath getFsPath() { + return new FsPath(COSTestCredentials.getTestBucketUri() + TEST_DATA_DIR); + } + + @AfterAll + static void clearFsConfig() { + FileSystem.initialize(new Configuration(), null); + } + + private static void initFileSystemWithToken(FsPath fsPath) throws Exception { + Configuration configuration = new Configuration(); + // obtain a security token and call onNewTokensObtained + ObtainedSecurityToken obtainedSecurityToken = fsPath.getFileSystem().obtainSecurityToken(); + COSSecurityTokenReceiver cosSecurityTokenReceiver = new COSSecurityTokenReceiver(); + cosSecurityTokenReceiver.onNewTokensObtained(obtainedSecurityToken); + + FileSystem.initialize(configuration, null); + } +} diff --git a/fluss-filesystems/pom.xml b/fluss-filesystems/pom.xml index e69829be17..6a8d4a9096 100644 --- a/fluss-filesystems/pom.xml +++ b/fluss-filesystems/pom.xml @@ -36,6 +36,7 @@ fluss-fs-gs fluss-fs-azure fluss-fs-obs + fluss-fs-cos fluss-fs-hdfs pom diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 399f64583e..61f799ba40 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -417,6 +417,7 @@ org.apache.fluss.fs.hdfs.HdfsSecurityTokenReceiver org.apache.fluss.fs.oss.* + org.apache.fluss.fs.cos.* org.apache.fluss.fs.s3.* org.apache.fluss.fs.obs.* com.amazonaws.services.s3.model.transform.XmlResponsesSaxParser* diff --git a/website/docs/_configs/_partial_config.mdx b/website/docs/_configs/_partial_config.mdx index 7bb8e0a663..2080b2b9b8 100644 --- a/website/docs/_configs/_partial_config.mdx +++ b/website/docs/_configs/_partial_config.mdx @@ -78,7 +78,7 @@ | `client.lookup.batch-timeout` | `0 s` | Duration | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | | `client.lookup.max-retries` | `2147483647` | Integer | Setting a value greater than zero will cause the client to resend any lookup request that fails with a potentially transient error. | | `client.scanner.remote-log.prefetch-num` | `4` | Integer | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. | -| `client.scanner.io.tmpdir` | `/var/folders/bp/v2l48kz51mx86d743qv0zhzh0000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | +| `client.scanner.io.tmpdir` | `/var/folders/7r/lwdsh9ms4gn0fnxs8c6fcfpm0000gn/T//fluss` | String | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | | `client.remote-file.download-thread-num` | `3` | Integer | The number of threads the client uses to download remote files. | | `client.filesystem.security.token.renewal.backoff` | `1 hours` | Duration | The time period how long to wait before retrying to obtain new security tokens for filesystem after a failure. | | `client.filesystem.security.token.renewal.time-ratio` | `0.75` | Double | Ratio of the token's expiration time when new credentials for access filesystem should be re-obtained. | @@ -238,9 +238,13 @@ | Key | Default | Type | Description | | :--- | :--- | :--- | :--- | -| `remote.data.dir` | `none` | String | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. | +| `remote.data.dir` | `none` | String | The directory used for storing the kv snapshot data files and remote log for log tiered storage in a Fluss supported filesystem. When upgrading to `remote.data.dirs`, please ensure this value is placed as the first entry in the new configuration.For new clusters, it is recommended to use `remote.data.dirs` instead. If `remote.data.dirs` is configured, this value will be ignored. | +| `remote.data.dirs` | `[]` | ArrayList | A comma-separated list of directories in Fluss supported filesystems for storing the kv snapshot data files and remote log files of tables/partitions. If configured, when a new table or a new partition is created, one of the directories from this list will be selected according to the strategy specified by `remote.data.dirs.strategy` (`ROUND_ROBIN` by default). If not configured, the system uses `remote.data.dir` as the sole remote data directory for all data. | +| `remote.data.dirs.strategy` | `ROUND_ROBIN` | RemoteDataDirStrategy | The strategy for selecting the remote data directory from `remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN. ROUND_ROBIN: this strategy employs a round-robin approach to select one from the available remote directories. WEIGHTED_ROUND_ROBIN: this strategy selects one of the available remote directories based on the weights configured in `remote.data.dirs.weights`. | +| `remote.data.dirs.weights` | `[]` | ArrayList | The weights of the remote data directories. This is a list of weights corresponding to the `remote.data.dirs` in the same order. When `remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be configured, and its size must be equal to `remote.data.dirs`; otherwise, it will be ignored. | | `remote.fs.write-buffer-size` | `4 kb` | MemorySize | The default size of the write buffer for writing the local files to remote file systems. | | `remote.log.task-interval-duration` | `1 min` | Duration | Interval at which remote log manager runs the scheduled tasks like copy segments, clean up remote log segments, delete local log segments etc. If the value is set to 0, it means that the remote log storage is disabled. | +| `remote.log.task-max-upload-segments` | `5` | Integer | The maximum number of log segments to upload to remote storage per tiering task execution. This limits the upload batch size to prevent overwhelming the remote storage when there is a large backlog of segments to upload. | | `remote.log.index-file-cache-size` | `1 gb` | MemorySize | The total size of the space allocated to store index files fetched from remote storage in the local storage. | | `remote.log-manager.thread-pool-size` | `4` | Integer | Size of the thread pool used in scheduling tasks to copy segments, fetch remote log indexes and clean up remote log segments. | | `remote.log.data-transfer-thread-num` | `4` | Integer | The number of threads the server uses to transfer (download and upload) remote log file can be data file, index file and remote log metadata file. This option is deprecated. Please use server.io-pool.size instead. | @@ -284,7 +288,7 @@ | `table.auto-partition.enabled` | `false` | Boolean | Whether enable auto partition for the table. Disable by default. When auto partition is enabled, the partitions of the table will be created automatically. | | `table.auto-partition.key` | `none` | String | This configuration defines the time-based partition key to be used for auto-partitioning when a table is partitioned with multiple keys. Auto-partitioning utilizes a time-based partition key to handle partitions automatically, including creating new ones and removing outdated ones, by comparing the time value of the partition with the current system time. In the case of a table using multiple partition keys (such as a composite partitioning strategy), this feature determines which key should serve as the primary time dimension for making auto-partitioning decisions.And If the table has only one partition key, this config is not necessary. Otherwise, it must be specified. | | `table.auto-partition.time-unit` | `DAY` | AutoPartitionTimeUnit | The time granularity for auto created partitions. The default value is `DAY`. Valid values are `HOUR`, `DAY`, `MONTH`, `QUARTER`, `YEAR`. If the value is `HOUR`, the partition format for auto created is yyyyMMddHH. If the value is `DAY`, the partition format for auto created is yyyyMMdd. If the value is `MONTH`, the partition format for auto created is yyyyMM. If the value is `QUARTER`, the partition format for auto created is yyyyQ. If the value is `YEAR`, the partition format for auto created is yyyy. | -| `table.auto-partition.time-zone` | `Europe/Paris` | String | The time zone for auto partitions, which is by default the same as the system time zone. | +| `table.auto-partition.time-zone` | `Asia/Shanghai` | String | The time zone for auto partitions, which is by default the same as the system time zone. | | `table.auto-partition.num-precreate` | `2` | Integer | The number of partitions to pre-create for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11 and the value is configured as 3, then partitions 20241111, 20241112, 20241113 will be pre-created. If any one partition exists, it'll skip creating the partition. The default value is 2, which means 2 partitions will be pre-created. If the `table.auto-partition.time-unit` is `DAY`(default), one precreated partition is for today and another one is for tomorrow.For a partition table with multiple partition keys, pre-create is unsupported and will be set to 0 automatically when creating table if it is not explicitly specified. | | `table.auto-partition.num-retention` | `7` | Integer | The number of history partitions to retain for auto created partitions in each check for auto partition. For example, if the current check time is 2024-11-11, time-unit is DAY, and the value is configured as 3, then the history partitions 20241108, 20241109, 20241110 will be retained. The partitions earlier than 20241108 will be deleted. The default value is 7. | | `table.log.ttl` | `168 hours` | Duration | The time to live for log segments. The configuration controls the maximum time we will retain a log before we will delete old segments to free up space. If set to -1, the log will not be deleted. |