diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSArchiveOperations.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSArchiveOperations.java new file mode 100644 index 000000000000..0f0c89e99c7a --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSArchiveOperations.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.paimon.fs.StorageType; +import org.apache.paimon.utils.ReflectionUtils; + +import com.aliyun.oss.ClientException; +import com.aliyun.oss.OSSClient; +import com.aliyun.oss.OSSException; +import com.aliyun.oss.internal.OSSHeaders; +import com.aliyun.oss.model.CopyObjectRequest; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.RestoreConfiguration; +import com.aliyun.oss.model.RestoreJobParameters; +import com.aliyun.oss.model.RestoreObjectRequest; +import com.aliyun.oss.model.RestoreTier; +import com.aliyun.oss.model.StorageClass; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem; +import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + +/** OSS archive operation helpers. */ +class OSSArchiveOperations { + + private static final long SECONDS_PER_DAY = 24 * 60 * 60; + + private static final String RESTORE_ALREADY_IN_PROGRESS = "RestoreAlreadyInProgress"; + + private OSSArchiveOperations() {} + + static StorageClass archiveStorageClass(StorageType type) { + switch (checkNotNull(type, "Storage type must not be null.")) { + case ARCHIVE: + return StorageClass.Archive; + case COLD_ARCHIVE: + return StorageClass.ColdArchive; + default: + throw new IllegalArgumentException( + "Unsupported OSS archive storage type: " + type + ". Use unarchive."); + } + } + + static StorageClass unarchiveStorageClass(StorageType type) { + if (checkNotNull(type, "Storage type must not be null.") == StorageType.STANDARD) { + return StorageClass.Standard; + } + throw new IllegalArgumentException("Unsupported OSS unarchive storage type: " + type); + } + + static int restoreDays(Duration duration) { + checkNotNull(duration, "Restore duration must not be null."); + checkArgument( + !duration.isZero() && !duration.isNegative(), + "Restore duration must be greater than zero."); + + long seconds = duration.getSeconds(); + long days = seconds / SECONDS_PER_DAY; + if (seconds % SECONDS_PER_DAY != 0 || duration.getNano() > 0) { + days++; + } + + checkArgument(days <= Integer.MAX_VALUE, "Restore duration is too large: %s", duration); + return (int) days; + } + + static CopyObjectRequest copyObjectRequest( + String bucket, String key, ObjectMetadata sourceMetadata, StorageClass storageClass) { + CopyObjectRequest request = new CopyObjectRequest(bucket, key, bucket, key); + request.setNewObjectMetadata(objectMetadata(sourceMetadata, storageClass)); + return request; + } + + static ObjectMetadata objectMetadata(ObjectMetadata sourceMetadata, StorageClass storageClass) { + ObjectMetadata metadata = new ObjectMetadata(); + if (sourceMetadata != null) { + if (sourceMetadata.getContentType() != null) { + metadata.setContentType(sourceMetadata.getContentType()); + } + if (sourceMetadata.getContentEncoding() != null) { + metadata.setContentEncoding(sourceMetadata.getContentEncoding()); + } + if (sourceMetadata.getCacheControl() != null) { + metadata.setCacheControl(sourceMetadata.getCacheControl()); + } + if (sourceMetadata.getContentDisposition() != null) { + metadata.setContentDisposition(sourceMetadata.getContentDisposition()); + } + if (sourceMetadata.getServerSideEncryption() != null) { + metadata.setServerSideEncryption(sourceMetadata.getServerSideEncryption()); + } + if (sourceMetadata.getServerSideEncryptionKeyId() != null) { + metadata.setServerSideEncryptionKeyId( + sourceMetadata.getServerSideEncryptionKeyId()); + } + if (sourceMetadata.getServerSideDataEncryption() != null) { + metadata.setServerSideDataEncryption(sourceMetadata.getServerSideDataEncryption()); + } + + Map userMetadata = sourceMetadata.getUserMetadata(); + if (userMetadata != null && !userMetadata.isEmpty()) { + metadata.setUserMetadata(new HashMap<>(userMetadata)); + } + } + metadata.setHeader(OSSHeaders.OSS_STORAGE_CLASS, storageClass); + return metadata; + } + + static RestoreObjectRequest restoreObjectRequest(String bucket, String key, int days) { + return new RestoreObjectRequest( + bucket, + key, + new RestoreConfiguration( + days, new RestoreJobParameters(RestoreTier.RESTORE_TIER_STANDARD))); + } + + static void changeStorageClass( + AliyunOSSFileSystem fileSystem, + org.apache.hadoop.fs.Path path, + StorageClass storageClass, + String operation) + throws IOException { + String bucket = bucket(fileSystem); + String key = pathToKey(fileSystem, path); + + try { + OSSClient client = ossClient(fileSystem); + client.copyObject( + copyObjectRequest( + bucket, key, client.getObjectMetadata(bucket, key), storageClass)); + } catch (OSSException e) { + throw new IOException(failureMessage(operation, path, storageClass), e); + } catch (ClientException e) { + throw new IOException(failureMessage(operation, path, storageClass), e); + } + } + + static void restoreArchive( + AliyunOSSFileSystem fileSystem, org.apache.hadoop.fs.Path path, int days) + throws IOException { + String bucket = bucket(fileSystem); + String key = pathToKey(fileSystem, path); + + try { + ossClient(fileSystem).restoreObject(restoreObjectRequest(bucket, key, days)); + } catch (OSSException e) { + if (isRestoreAlreadyInProgress(e)) { + return; + } + throw new IOException("Failed to restore archived OSS object " + path + ".", e); + } catch (ClientException e) { + throw new IOException("Failed to restore archived OSS object " + path + ".", e); + } + } + + static boolean isRestoreAlreadyInProgress(OSSException exception) { + return RESTORE_ALREADY_IN_PROGRESS.equals(exception.getErrorCode()); + } + + static String pathToKey(AliyunOSSFileSystem fileSystem, org.apache.hadoop.fs.Path path) { + org.apache.hadoop.fs.Path qualified = path; + if (!qualified.isAbsolute()) { + qualified = new org.apache.hadoop.fs.Path(fileSystem.getWorkingDirectory(), path); + } + + String key = qualified.toUri().getPath(); + return key.startsWith("/") ? key.substring(1) : key; + } + + private static String bucket(AliyunOSSFileSystem fileSystem) { + return fileSystem.getUri().getHost(); + } + + private static OSSClient ossClient(AliyunOSSFileSystem fileSystem) throws IOException { + AliyunOSSFileSystemStore store = fileSystem.getStore(); + try { + return ReflectionUtils.getPrivateFieldValue(store, "ossClient"); + } catch (Exception e) { + throw new IOException("Failed to access OSS client from AliyunOSSFileSystem.", e); + } + } + + private static String failureMessage( + String operation, org.apache.hadoop.fs.Path path, StorageClass storageClass) { + String message = + "Failed to " + operation + " OSS object " + path + " to " + storageClass + "."; + if ("unarchive".equals(operation)) { + message += " Restore it before unarchiving if it is archived."; + } + return message; + } +} diff --git a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java index 85d7e8cf9ec8..cd2b41eae981 100644 --- a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java +++ b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.HadoopOptionsProvider; import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.StorageType; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.options.Options; import org.apache.paimon.utils.IOUtils; @@ -39,9 +40,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; @@ -128,6 +131,37 @@ public TwoPhaseOutputStream newTwoPhaseOutputStream(Path path, boolean overwrite path); } + @Override + public Optional archive(Path path, StorageType type) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + OSSArchiveOperations.changeStorageClass( + (AliyunOSSFileSystem) getFileSystem(hadoopPath), + hadoopPath, + OSSArchiveOperations.archiveStorageClass(type), + "archive"); + return Optional.empty(); + } + + @Override + public void restoreArchive(Path path, Duration duration) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + OSSArchiveOperations.restoreArchive( + (AliyunOSSFileSystem) getFileSystem(hadoopPath), + hadoopPath, + OSSArchiveOperations.restoreDays(duration)); + } + + @Override + public Optional unarchive(Path path, StorageType type) throws IOException { + org.apache.hadoop.fs.Path hadoopPath = path(path); + OSSArchiveOperations.changeStorageClass( + (AliyunOSSFileSystem) getFileSystem(hadoopPath), + hadoopPath, + OSSArchiveOperations.unarchiveStorageClass(type), + "unarchive"); + return Optional.empty(); + } + public Options hadoopOptions() { return hadoopOptions; } diff --git a/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSArchiveOperationsTest.java b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSArchiveOperationsTest.java new file mode 100644 index 000000000000..a0fac0072582 --- /dev/null +++ b/paimon-filesystems/paimon-oss-impl/src/test/java/org/apache/paimon/oss/OSSArchiveOperationsTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.oss; + +import org.apache.paimon.fs.StorageType; + +import com.aliyun.oss.OSSException; +import com.aliyun.oss.internal.OSSHeaders; +import com.aliyun.oss.model.CopyObjectRequest; +import com.aliyun.oss.model.ObjectMetadata; +import com.aliyun.oss.model.RestoreObjectRequest; +import com.aliyun.oss.model.RestoreTier; +import com.aliyun.oss.model.StorageClass; +import org.junit.jupiter.api.Test; + +import java.time.Duration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link OSSArchiveOperations}. */ +class OSSArchiveOperationsTest { + + @Test + void testArchiveStorageClassMapping() { + assertThat(OSSArchiveOperations.archiveStorageClass(StorageType.ARCHIVE)) + .isEqualTo(StorageClass.Archive); + assertThat(OSSArchiveOperations.archiveStorageClass(StorageType.COLD_ARCHIVE)) + .isEqualTo(StorageClass.ColdArchive); + + assertThatThrownBy(() -> OSSArchiveOperations.archiveStorageClass(StorageType.STANDARD)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Use unarchive"); + } + + @Test + void testUnarchiveStorageClassMapping() { + assertThat(OSSArchiveOperations.unarchiveStorageClass(StorageType.STANDARD)) + .isEqualTo(StorageClass.Standard); + + assertThatThrownBy(() -> OSSArchiveOperations.unarchiveStorageClass(StorageType.ARCHIVE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported OSS unarchive storage type"); + assertThatThrownBy( + () -> OSSArchiveOperations.unarchiveStorageClass(StorageType.COLD_ARCHIVE)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported OSS unarchive storage type"); + } + + @Test + void testRestoreDays() { + assertThat(OSSArchiveOperations.restoreDays(Duration.ofNanos(1))).isEqualTo(1); + assertThat(OSSArchiveOperations.restoreDays(Duration.ofHours(24))).isEqualTo(1); + assertThat(OSSArchiveOperations.restoreDays(Duration.ofHours(25))).isEqualTo(2); + assertThat(OSSArchiveOperations.restoreDays(Duration.ofDays(7))).isEqualTo(7); + + assertThatThrownBy(() -> OSSArchiveOperations.restoreDays(Duration.ZERO)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("greater than zero"); + assertThatThrownBy(() -> OSSArchiveOperations.restoreDays(Duration.ofSeconds(-1))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("greater than zero"); + } + + @Test + void testCopyObjectRequest() { + ObjectMetadata sourceMetadata = new ObjectMetadata(); + sourceMetadata.setContentType("application/octet-stream"); + sourceMetadata.setCacheControl("no-cache"); + sourceMetadata.addUserMetadata("owner", "paimon"); + + CopyObjectRequest request = + OSSArchiveOperations.copyObjectRequest( + "bucket", "partition/data.orc", sourceMetadata, StorageClass.Archive); + + assertThat(request.getSourceBucketName()).isEqualTo("bucket"); + assertThat(request.getSourceKey()).isEqualTo("partition/data.orc"); + assertThat(request.getDestinationBucketName()).isEqualTo("bucket"); + assertThat(request.getDestinationKey()).isEqualTo("partition/data.orc"); + assertThat(request.getNewObjectMetadata().getContentType()) + .isEqualTo("application/octet-stream"); + assertThat(request.getNewObjectMetadata().getCacheControl()).isEqualTo("no-cache"); + assertThat(request.getNewObjectMetadata().getUserMetadata()) + .containsEntry("owner", "paimon"); + assertThat(request.getNewObjectMetadata().getRawMetadata()) + .containsEntry(OSSHeaders.OSS_STORAGE_CLASS, StorageClass.Archive); + } + + @Test + void testRestoreObjectRequest() { + RestoreObjectRequest request = + OSSArchiveOperations.restoreObjectRequest("bucket", "partition/data.orc", 7); + + assertThat(request.getBucketName()).isEqualTo("bucket"); + assertThat(request.getKey()).isEqualTo("partition/data.orc"); + assertThat(request.getRestoreConfiguration().getDays()).isEqualTo(7); + assertThat(request.getRestoreConfiguration().getRestoreJobParameters().getRestoreTier()) + .isEqualTo(RestoreTier.RESTORE_TIER_STANDARD); + } + + @Test + void testRestoreAlreadyInProgress() { + assertThat( + OSSArchiveOperations.isRestoreAlreadyInProgress( + ossException("RestoreAlreadyInProgress"))) + .isTrue(); + assertThat(OSSArchiveOperations.isRestoreAlreadyInProgress(ossException("OtherError"))) + .isFalse(); + } + + private static OSSException ossException(String errorCode) { + return new OSSException( + "message", errorCode, "request-id", "host-id", "raw", "resource", "header"); + } +}