diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f961ed118..5df200ddf 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,12 +43,22 @@ jobs: timeout-minutes: 30 strategy: fail-fast: false + env: + ICEBERG_TEST_S3_URI: s3://iceberg-test + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://127.0.0.1:9000 + AWS_EC2_METADATA_DISABLED: "TRUE" steps: - name: Checkout iceberg-cpp uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 - name: Install dependencies shell: bash run: sudo apt-get update && sudo apt-get install -y libcurl4-openssl-dev + - name: Start MinIO + shell: bash + run: bash ci/scripts/start_minio.sh - name: Build Iceberg shell: bash env: @@ -67,9 +77,19 @@ jobs: timeout-minutes: 30 strategy: fail-fast: false + env: + ICEBERG_TEST_S3_URI: s3://iceberg-test + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://127.0.0.1:9000 + AWS_EC2_METADATA_DISABLED: "TRUE" steps: - name: Checkout iceberg-cpp uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + - name: Start MinIO + shell: bash + run: bash ci/scripts/start_minio.sh - name: Build Iceberg shell: bash run: ci/scripts/build_iceberg.sh $(pwd) @@ -82,6 +102,13 @@ jobs: timeout-minutes: 60 strategy: fail-fast: false + env: + ICEBERG_TEST_S3_URI: s3://iceberg-test + AWS_ACCESS_KEY_ID: minio + AWS_SECRET_ACCESS_KEY: minio123 + AWS_DEFAULT_REGION: us-east-1 + AWS_ENDPOINT_URL: http://127.0.0.1:9000 + AWS_EC2_METADATA_DISABLED: "TRUE" steps: - name: Checkout iceberg-cpp uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 @@ -91,6 +118,9 @@ jobs: vcpkg install zlib:x64-windows nlohmann-json:x64-windows nanoarrow:x64-windows roaring:x64-windows cpr:x64-windows - name: Setup sccache uses: mozilla-actions/sccache-action@7d986dd989559c6ecdb630a3fd2557667be217ad # v0.0.9 + - name: Start MinIO + shell: bash + run: bash ci/scripts/start_minio.sh - name: Build Iceberg shell: cmd env: diff --git a/ci/scripts/start_minio.sh b/ci/scripts/start_minio.sh new file mode 100644 index 000000000..e3f416509 --- /dev/null +++ b/ci/scripts/start_minio.sh @@ -0,0 +1,127 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -eux + +MINIO_ROOT_USER="${MINIO_ROOT_USER:-minio}" +MINIO_ROOT_PASSWORD="${MINIO_ROOT_PASSWORD:-minio123}" +MINIO_IMAGE="${MINIO_IMAGE:-minio/minio:RELEASE.2024-12-18T00-00-00Z}" +MINIO_CONTAINER_NAME="${MINIO_CONTAINER_NAME:-iceberg-minio}" +MINIO_PORT="${MINIO_PORT:-9000}" +MINIO_CONSOLE_PORT="${MINIO_CONSOLE_PORT:-9001}" +MINIO_BUCKET="${MINIO_BUCKET:-iceberg-test}" +MINIO_ENDPOINT="${MINIO_ENDPOINT:-http://127.0.0.1:${MINIO_PORT}}" + +wait_for_minio() { + for i in {1..30}; do + if curl -fsS "${MINIO_ENDPOINT}/minio/health/ready" >/dev/null; then + return 0 + fi + sleep 1 + done + return 1 +} + +start_minio_docker() { + if ! command -v docker >/dev/null 2>&1; then + return 1 + fi + + if docker ps -a --format '{{.Names}}' | grep -q "^${MINIO_CONTAINER_NAME}\$"; then + docker rm -f "${MINIO_CONTAINER_NAME}" + fi + + docker run -d --name "${MINIO_CONTAINER_NAME}" \ + -p "${MINIO_PORT}:9000" -p "${MINIO_CONSOLE_PORT}:9001" \ + -e "MINIO_ROOT_USER=${MINIO_ROOT_USER}" \ + -e "MINIO_ROOT_PASSWORD=${MINIO_ROOT_PASSWORD}" \ + "${MINIO_IMAGE}" \ + server /data --console-address ":${MINIO_CONSOLE_PORT}" + + wait_for_minio +} + +start_minio_macos() { + if ! command -v brew >/dev/null 2>&1; then + echo "brew is required to start MinIO on macOS without Docker" >&2 + return 1 + fi + + brew install minio + minio server /tmp/minio --console-address ":${MINIO_CONSOLE_PORT}" & + wait_for_minio +} + +download_mc() { + local uname_out + uname_out="$(uname -s)" + + local mc_dir + mc_dir="${RUNNER_TEMP:-/tmp}" + mkdir -p "${mc_dir}" + + case "${uname_out}" in + Linux*) + MC_BIN="${mc_dir}/mc" + curl -sSL "https://dl.min.io/client/mc/release/linux-amd64/mc" -o "${MC_BIN}" + chmod +x "${MC_BIN}" + ;; + Darwin*) + MC_BIN="${mc_dir}/mc" + curl -sSL "https://dl.min.io/client/mc/release/darwin-amd64/mc" -o "${MC_BIN}" + chmod +x "${MC_BIN}" + ;; + MINGW*|MSYS*|CYGWIN*) + MC_BIN="${mc_dir}/mc.exe" + curl -sSL "https://dl.min.io/client/mc/release/windows-amd64/mc.exe" -o "${MC_BIN}" + ;; + *) + echo "Unsupported OS for mc: ${uname_out}" >&2 + return 1 + ;; + esac +} + +create_bucket() { + download_mc + for i in {1..30}; do + if "${MC_BIN}" alias set local "${MINIO_ENDPOINT}" "${MINIO_ROOT_USER}" "${MINIO_ROOT_PASSWORD}"; then + break + fi + sleep 1 + done + "${MC_BIN}" mb --ignore-existing "local/${MINIO_BUCKET}" +} + +case "$(uname -s)" in + Darwin*) + if ! start_minio_docker; then + start_minio_macos + fi + ;; + Linux*|MINGW*|MSYS*|CYGWIN*) + start_minio_docker + ;; + *) + echo "Unsupported OS: $(uname -s)" >&2 + exit 1 + ;; +esac + +create_bucket diff --git a/cmake_modules/IcebergThirdpartyToolchain.cmake b/cmake_modules/IcebergThirdpartyToolchain.cmake index 277459364..5b17b0997 100644 --- a/cmake_modules/IcebergThirdpartyToolchain.cmake +++ b/cmake_modules/IcebergThirdpartyToolchain.cmake @@ -87,6 +87,7 @@ function(resolve_arrow_dependency) # Work around undefined symbol: arrow::ipc::ReadSchema(arrow::io::InputStream*, arrow::ipc::DictionaryMemo*) set(ARROW_IPC ON) set(ARROW_FILESYSTEM ON) + set(ARROW_S3 ON) set(ARROW_JSON ON) set(ARROW_PARQUET ON) set(ARROW_SIMD_LEVEL "NONE") diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 5d3ff33b3..383701a08 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -39,6 +39,7 @@ set(ICEBERG_SOURCES expression/rewrite_not.cc expression/strict_metrics_evaluator.cc expression/term.cc + file_io_registry.cc file_reader.cc file_writer.cc inheritable_metadata.cc @@ -171,6 +172,8 @@ add_subdirectory(util) if(ICEBERG_BUILD_BUNDLE) set(ICEBERG_BUNDLE_SOURCES arrow/arrow_fs_file_io.cc + arrow/arrow_s3_file_io.cc + arrow/file_io_register.cc arrow/metadata_column_util.cc avro/avro_data_util.cc avro/avro_direct_decoder.cc diff --git a/src/iceberg/arrow/arrow_file_io.h b/src/iceberg/arrow/arrow_file_io.h index 12a9b2303..514881b11 100644 --- a/src/iceberg/arrow/arrow_file_io.h +++ b/src/iceberg/arrow/arrow_file_io.h @@ -20,9 +20,12 @@ #pragma once #include +#include +#include #include "iceberg/file_io.h" #include "iceberg/iceberg_bundle_export.h" +#include "iceberg/result.h" namespace iceberg::arrow { @@ -30,4 +33,18 @@ ICEBERG_BUNDLE_EXPORT std::unique_ptr MakeMockFileIO(); ICEBERG_BUNDLE_EXPORT std::unique_ptr MakeLocalFileIO(); +/// \brief Create an S3 FileIO backed by Arrow's S3FileSystem. +/// +/// This function initializes the S3 subsystem if not already initialized (thread-safe). +/// The S3 initialization is done once per process using std::call_once. +/// +/// \param uri An S3 URI (must start with "s3://") used to validate the scheme. +/// \param properties Optional configuration properties for S3 access. See S3Properties +/// for available keys (credentials, region, endpoint, timeouts, etc.). +/// \return A FileIO instance for S3 operations, or an error if S3 is not supported +/// or the URI is invalid. +ICEBERG_BUNDLE_EXPORT Result> MakeS3FileIO( + const std::string& uri, + const std::unordered_map& properties = {}); + } // namespace iceberg::arrow diff --git a/src/iceberg/arrow/arrow_s3_file_io.cc b/src/iceberg/arrow/arrow_s3_file_io.cc new file mode 100644 index 000000000..9090d7b35 --- /dev/null +++ b/src/iceberg/arrow/arrow_s3_file_io.cc @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include +#include +#if __has_include() +#include +#define ICEBERG_ARROW_HAS_S3 1 +#else +#define ICEBERG_ARROW_HAS_S3 0 +#endif + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/util/macros.h" + +namespace iceberg::arrow { + +namespace { + +bool IsS3Uri(std::string_view uri) { return uri.rfind("s3://", 0) == 0; } + +Status EnsureS3Initialized() { +#if ICEBERG_ARROW_HAS_S3 + static std::once_flag init_flag; + static ::arrow::Status init_status = ::arrow::Status::OK(); + std::call_once(init_flag, []() { + ::arrow::fs::S3GlobalOptions options; + init_status = ::arrow::fs::InitializeS3(options); + if (init_status.ok()) { + std::atexit([]() { (void)::arrow::fs::FinalizeS3(); }); + } + }); + if (!init_status.ok()) { + return std::unexpected{ + {.kind = ::iceberg::arrow::ToErrorKind(init_status), + .message = init_status.ToString()}}; + } + return {}; +#else + return NotImplemented("Arrow S3 support is not enabled"); +#endif +} + +#if ICEBERG_ARROW_HAS_S3 +/// \brief Configure S3Options from a properties map. +/// +/// \param properties The configuration properties map. +/// \return Configured S3Options. +::arrow::fs::S3Options ConfigureS3Options( + const std::unordered_map& properties) { + ::arrow::fs::S3Options options; + + // Configure credentials + auto access_key_it = properties.find(S3Properties::kAccessKeyId); + auto secret_key_it = properties.find(S3Properties::kSecretAccessKey); + auto session_token_it = properties.find(S3Properties::kSessionToken); + + if (access_key_it != properties.end() && secret_key_it != properties.end()) { + if (session_token_it != properties.end()) { + options.ConfigureAccessKey(access_key_it->second, secret_key_it->second, + session_token_it->second); + } else { + options.ConfigureAccessKey(access_key_it->second, secret_key_it->second); + } + } else { + // Use default credential chain (environment, instance profile, etc.) + options.ConfigureDefaultCredentials(); + } + + // Configure region + auto region_it = properties.find(S3Properties::kRegion); + if (region_it != properties.end()) { + options.region = region_it->second; + } + + // Configure endpoint (for MinIO, LocalStack, etc.) + auto endpoint_it = properties.find(S3Properties::kEndpoint); + if (endpoint_it != properties.end()) { + options.endpoint_override = endpoint_it->second; + } + + // Configure path-style access (needed for MinIO) + auto path_style_it = properties.find(S3Properties::kPathStyleAccess); + if (path_style_it != properties.end()) { + // Arrow's S3 path-style is controlled via endpoint scheme + // For path-style access, we need to ensure the endpoint is properly configured + } + + // Configure SSL + auto ssl_it = properties.find(S3Properties::kSslEnabled); + if (ssl_it != properties.end() && ssl_it->second == "false") { + options.scheme = "http"; + } + + // Configure timeouts + auto connect_timeout_it = properties.find(S3Properties::kConnectTimeoutMs); + if (connect_timeout_it != properties.end()) { + options.connect_timeout = std::stod(connect_timeout_it->second) / 1000.0; + } + + auto socket_timeout_it = properties.find(S3Properties::kSocketTimeoutMs); + if (socket_timeout_it != properties.end()) { + options.request_timeout = std::stod(socket_timeout_it->second) / 1000.0; + } + + return options; +} + +/// \brief Create an S3 FileSystem with the given options. +/// +/// \param options The S3Options to use. +/// \return A shared_ptr to the S3FileSystem, or an error. +Result> MakeS3FileSystem( + const ::arrow::fs::S3Options& options) { + ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized()); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::S3FileSystem::Make(options)); + return fs; +} +#endif + +Result> ResolveFileSystemFromUri( + const std::string& uri, std::string* out_path) { + if (IsS3Uri(uri)) { + ICEBERG_RETURN_UNEXPECTED(EnsureS3Initialized()); + } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto fs, ::arrow::fs::FileSystemFromUri(uri, out_path)); + return fs; +} + +/// \brief ArrowUriFileIO resolves FileSystem from URI for each operation. +/// +/// This implementation is thread-safe as it creates a new FileSystem instance +/// for each operation. However, it may be less efficient than caching the +/// FileSystem. S3 initialization is done once per process. +class ArrowUriFileIO : public FileIO { + public: + Result ReadFile(const std::string& file_location, + std::optional length) override { + std::string path; + ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path)); + ::arrow::fs::FileInfo file_info(path); + if (length.has_value()) { + file_info.set_size(length.value()); + } + std::string content; + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenInputFile(file_info)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file_size, file->GetSize()); + + content.resize(file_size); + size_t remain = file_size; + size_t offset = 0; + while (remain > 0) { + size_t read_length = std::min(remain, static_cast(1024 * 1024)); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto read_bytes, + file->Read(read_length, reinterpret_cast(&content[offset]))); + remain -= read_bytes; + offset += read_bytes; + } + + return content; + } + + Status WriteFile(const std::string& file_location, + std::string_view content) override { + std::string path; + ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, fs->OpenOutputStream(path)); + ICEBERG_ARROW_RETURN_NOT_OK(file->Write(content.data(), content.size())); + ICEBERG_ARROW_RETURN_NOT_OK(file->Flush()); + ICEBERG_ARROW_RETURN_NOT_OK(file->Close()); + return {}; + } + + Status DeleteFile(const std::string& file_location) override { + std::string path; + ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(file_location, &path)); + ICEBERG_ARROW_RETURN_NOT_OK(fs->DeleteFile(path)); + return {}; + } +}; + +} // namespace + +Result> MakeS3FileIO( + const std::string& uri, + const std::unordered_map& properties) { + if (!IsS3Uri(uri)) { + return InvalidArgument("S3 URI must start with s3://"); + } +#if !ICEBERG_ARROW_HAS_S3 + return NotImplemented("Arrow S3 support is not enabled"); +#else + // If properties are empty, use the simple URI-based resolution + if (properties.empty()) { + // Validate that S3 can be initialized and the URI is valid + std::string path; + ICEBERG_ASSIGN_OR_RAISE(auto fs, ResolveFileSystemFromUri(uri, &path)); + (void)path; + (void)fs; + return std::make_unique(); + } + + // Create S3FileSystem with explicit configuration + auto options = ConfigureS3Options(properties); + ICEBERG_ASSIGN_OR_RAISE(auto fs, MakeS3FileSystem(options)); + + // Return ArrowFileSystemFileIO with the configured S3 filesystem + return std::make_unique(std::move(fs)); +#endif +} + +} // namespace iceberg::arrow diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index cc052e241..6dcbe6894 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -27,6 +27,7 @@ #include #include "iceberg/catalog/rest/catalog_properties.h" +#include "iceberg/file_io_registry.h" #include "iceberg/catalog/rest/constant.h" #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/error_handlers.h" @@ -144,6 +145,40 @@ Result> RestCatalog::Make( std::move(endpoints))); } +Result> RestCatalog::Make( + const RestCatalogProperties& config) { + // Get warehouse location to determine the appropriate FileIO type + auto warehouse = config.Get(RestCatalogProperties::kWarehouse); + if (warehouse.empty()) { + return InvalidArgument( + "Warehouse location is required when FileIO is not explicitly provided. " + "Set the 'warehouse' property to an S3 URI (s3://...) or local path."); + } + + // Check for user-specified io-impl property + auto io_impl = config.configs().find(FileIOProperties::kImpl); + std::string impl_name; + + if (io_impl != config.configs().end() && !io_impl->second.empty()) { + // User specified a custom io-impl + impl_name = io_impl->second; + } else { + // Use default based on warehouse URI scheme + if (warehouse.rfind("s3://", 0) == 0) { + impl_name = FileIORegistry::kArrowS3FileIO; + } else { + impl_name = FileIORegistry::kArrowLocalFileIO; + } + } + + // Load FileIO from registry + ICEBERG_ASSIGN_OR_RAISE(auto file_io, + FileIORegistry::Load(impl_name, warehouse, config.configs())); + + // Call the main Make method with the created FileIO + return Make(config, std::move(file_io)); +} + RestCatalog::RestCatalog(std::unique_ptr config, std::shared_ptr file_io, std::unique_ptr paths, diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 721df29d8..47a485c63 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -53,6 +53,30 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, static Result> Make(const RestCatalogProperties& config, std::shared_ptr file_io); + /// \brief Create a RestCatalog instance with auto-detected FileIO. + /// + /// This overload automatically creates an appropriate FileIO based on the "io-impl" + /// property or the warehouse location URI scheme. + /// + /// FileIO selection logic: + /// 1. If "io-impl" property is set, use the specified implementation from FileIORegistry. + /// 2. Otherwise, auto-detect based on warehouse URI: + /// - "s3://" -> ArrowS3FileIO + /// - Local path -> ArrowLocalFileIO + /// + /// Users can register custom FileIO implementations via FileIORegistry::Register(): + /// \code + /// FileIORegistry::Register("com.mycompany.MyFileIO", + /// [](const std::string& warehouse, const auto& props) { + /// return std::make_shared(warehouse, props); + /// }); + /// \endcode + /// + /// \param config the configuration for the RestCatalog, including warehouse location + /// and optional "io-impl" property + /// \return a shared_ptr to RestCatalog instance, or an error if FileIO creation fails + static Result> Make(const RestCatalogProperties& config); + std::string_view name() const override; Result> ListNamespaces(const Namespace& ns) const override; diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index ae61d819f..91d857e98 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -112,6 +112,7 @@ add_iceberg_test(util_test data_file_set_test.cc decimal_test.cc endian_test.cc + file_io_registry_test.cc formatter_test.cc location_util_test.cc string_util_test.cc @@ -136,6 +137,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES arrow_fs_file_io_test.cc + arrow_s3_file_io_test.cc arrow_test.cc gzip_decompress_test.cc metadata_io_test.cc diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc new file mode 100644 index 000000000..735e12067 --- /dev/null +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#if __has_include() +#include +#endif +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/s3_properties.h" +#include "iceberg/test/matchers.h" + +namespace iceberg::arrow { + +#if __has_include() +namespace { +class ArrowS3Environment final : public ::testing::Environment { + public: + void TearDown() override { (void)::arrow::fs::FinalizeS3(); } +}; +} // namespace +#endif + +TEST(ArrowS3FileIOTest, RejectsNonS3Uri) { + auto result = MakeS3FileIO("file:///tmp/not-s3"); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("s3://")); +} + +#if __has_include() +TEST(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) { + auto result = MakeS3FileIO("s3://bucket/path"); + if (!result.has_value()) { + EXPECT_NE(result.error().kind, ErrorKind::kNotImplemented); + } +} +#else +TEST(ArrowS3FileIOTest, RequiresS3SupportAtBuildTime) { + auto result = MakeS3FileIO("s3://warehouse/iceberg_example"); + EXPECT_THAT(result, IsError(ErrorKind::kNotImplemented)); +} +#endif + +TEST(ArrowS3FileIOTest, ReadWriteFile) { + const char* base_uri = std::getenv("ICEBERG_TEST_S3_URI"); + if (base_uri == nullptr || std::string(base_uri).empty()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + + auto io_res = MakeS3FileIO(base_uri); + if (!io_res.has_value()) { + if (io_res.error().kind == ErrorKind::kNotImplemented) { + GTEST_SKIP() << "Arrow S3 support is not enabled"; + } + FAIL() << "MakeS3FileIO failed: " << io_res.error().message; + } + + auto io = std::move(io_res.value()); + std::string object_uri = base_uri; + if (!object_uri.ends_with('/')) { + object_uri += '/'; + } + object_uri += "iceberg_s3_io_test.txt"; + auto write_res = io->WriteFile(object_uri, "hello s3"); + ASSERT_THAT(write_res, IsOk()); + + auto read_res = io->ReadFile(object_uri, std::nullopt); + ASSERT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3"))); + + auto del_res = io->DeleteFile(object_uri); + EXPECT_THAT(del_res, IsOk()); +} + +// ============================================================================ +// Tests for MakeS3FileIO with properties +// ============================================================================ + +TEST(ArrowS3FileIOTest, MakeS3FileIOWithPropertiesRejectsNonS3Uri) { + std::unordered_map properties; + auto result = MakeS3FileIO("file:///tmp/not-s3", properties); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("s3://")); +} + +TEST(ArrowS3FileIOTest, MakeS3FileIOWithEmptyPropertiesFallsBack) { + const char* base_uri = std::getenv("ICEBERG_TEST_S3_URI"); + if (base_uri == nullptr || std::string(base_uri).empty()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + + // Empty properties should fall back to URI-based resolution + std::unordered_map properties; + auto io_res = MakeS3FileIO(base_uri, properties); + if (!io_res.has_value()) { + if (io_res.error().kind == ErrorKind::kNotImplemented) { + GTEST_SKIP() << "Arrow S3 support is not enabled"; + } + FAIL() << "MakeS3FileIO failed: " << io_res.error().message; + } + + EXPECT_NE(io_res.value(), nullptr); +} + +TEST(ArrowS3FileIOTest, MakeS3FileIOWithProperties) { + const char* base_uri = std::getenv("ICEBERG_TEST_S3_URI"); + const char* access_key = std::getenv("AWS_ACCESS_KEY_ID"); + const char* secret_key = std::getenv("AWS_SECRET_ACCESS_KEY"); + const char* endpoint = std::getenv("ICEBERG_TEST_S3_ENDPOINT"); + const char* region = std::getenv("AWS_REGION"); + + if (base_uri == nullptr || std::string(base_uri).empty()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + + std::unordered_map properties; + + // Configure credentials if available + if (access_key != nullptr && secret_key != nullptr) { + properties[S3Properties::kAccessKeyId] = access_key; + properties[S3Properties::kSecretAccessKey] = secret_key; + } + + // Configure endpoint if available (for MinIO, LocalStack, etc.) + if (endpoint != nullptr && std::string(endpoint).length() > 0) { + properties[S3Properties::kEndpoint] = endpoint; + } + + // Configure region if available + if (region != nullptr && std::string(region).length() > 0) { + properties[S3Properties::kRegion] = region; + } + + auto io_res = MakeS3FileIO(base_uri, properties); + if (!io_res.has_value()) { + if (io_res.error().kind == ErrorKind::kNotImplemented) { + GTEST_SKIP() << "Arrow S3 support is not enabled"; + } + FAIL() << "MakeS3FileIO failed: " << io_res.error().message; + } + + auto io = std::move(io_res.value()); + std::string object_uri = base_uri; + if (!object_uri.ends_with('/')) { + object_uri += '/'; + } + object_uri += "iceberg_s3_io_props_test.txt"; + + auto write_res = io->WriteFile(object_uri, "hello s3 with properties"); + ASSERT_THAT(write_res, IsOk()); + + auto read_res = io->ReadFile(object_uri, std::nullopt); + ASSERT_THAT(read_res, IsOk()); + EXPECT_THAT(read_res, HasValue(::testing::Eq("hello s3 with properties"))); + + auto del_res = io->DeleteFile(object_uri); + EXPECT_THAT(del_res, IsOk()); +} + +TEST(ArrowS3FileIOTest, MakeS3FileIOWithSslDisabled) { + const char* base_uri = std::getenv("ICEBERG_TEST_S3_URI"); + if (base_uri == nullptr || std::string(base_uri).empty()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + + std::unordered_map properties; + properties[S3Properties::kSslEnabled] = "false"; + + // Just test that the configuration is accepted + auto io_res = MakeS3FileIO(base_uri, properties); + if (!io_res.has_value()) { + if (io_res.error().kind == ErrorKind::kNotImplemented) { + GTEST_SKIP() << "Arrow S3 support is not enabled"; + } + // Other errors are acceptable - just checking config parsing works + } +} + +TEST(ArrowS3FileIOTest, MakeS3FileIOWithTimeouts) { + const char* base_uri = std::getenv("ICEBERG_TEST_S3_URI"); + if (base_uri == nullptr || std::string(base_uri).empty()) { + GTEST_SKIP() << "Set ICEBERG_TEST_S3_URI to enable S3 IO test"; + } + + std::unordered_map properties; + properties[S3Properties::kConnectTimeoutMs] = "5000"; + properties[S3Properties::kSocketTimeoutMs] = "10000"; + + auto io_res = MakeS3FileIO(base_uri, properties); + if (!io_res.has_value()) { + if (io_res.error().kind == ErrorKind::kNotImplemented) { + GTEST_SKIP() << "Arrow S3 support is not enabled"; + } + // Other errors are acceptable - just checking config parsing works + } +} + +} // namespace iceberg::arrow + +#if __has_include() +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new iceberg::arrow::ArrowS3Environment()); + return RUN_ALL_TESTS(); +} +#endif diff --git a/src/iceberg/test/rest_catalog_test.cc b/src/iceberg/test/rest_catalog_test.cc index 20560979b..8cda022a0 100644 --- a/src/iceberg/test/rest_catalog_test.cc +++ b/src/iceberg/test/rest_catalog_test.cc @@ -40,6 +40,7 @@ #include "iceberg/catalog/rest/error_handlers.h" #include "iceberg/catalog/rest/http_client.h" #include "iceberg/catalog/rest/json_serde_internal.h" +#include "iceberg/file_io_registry.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" #include "iceberg/schema.h" @@ -687,4 +688,89 @@ TEST_F(RestCatalogIntegrationTest, StageCreateTable) { EXPECT_EQ(props.at("key1"), "value1"); } +// ============================================================================ +// Tests for RestCatalog::Make(config) with auto-detected FileIO +// ============================================================================ + +TEST_F(RestCatalogIntegrationTest, MakeWithoutWarehouseReturnsError) { + // Create config without warehouse + auto config = RestCatalogProperties::default_properties(); + config + ->Set(RestCatalogProperties::kUri, + std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)); + // Note: warehouse is NOT set + + auto result = RestCatalog::Make(*config); + + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Warehouse location is required")); +} + +TEST_F(RestCatalogIntegrationTest, MakeWithUnregisteredIoImplReturnsError) { + auto config = RestCatalogProperties::default_properties(); + config + ->Set(RestCatalogProperties::kUri, + std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)) + .Set(RestCatalogProperties::kWarehouse, "/local/warehouse") + .Set(FileIOProperties::kImpl, "com.nonexistent.FileIO"); + + auto result = RestCatalog::Make(*config); + + // Should fail because the io-impl is not registered + EXPECT_THAT(result, IsError(ErrorKind::kNotFound)); + EXPECT_THAT(result, HasErrorMessage("FileIO implementation not found")); +} + +TEST_F(RestCatalogIntegrationTest, MakeWithAutoDetectedLocalFileIO) { + // Register a local FileIO implementation for testing + FileIORegistry::Register( + FileIORegistry::kArrowLocalFileIO, + [](const std::string& /*warehouse*/, + const std::unordered_map& /*properties*/) + -> Result> { + return std::make_shared(); + }); + + auto config = RestCatalogProperties::default_properties(); + config + ->Set(RestCatalogProperties::kUri, + std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)) + .Set(RestCatalogProperties::kWarehouse, "/local/warehouse"); + + auto catalog_result = RestCatalog::Make(*config); + ASSERT_THAT(catalog_result, IsOk()); + + auto& catalog = catalog_result.value(); + EXPECT_EQ(catalog->name(), kCatalogName); +} + +TEST_F(RestCatalogIntegrationTest, MakeWithCustomIoImpl) { + // Register a custom FileIO implementation + const std::string custom_impl = "com.mycompany.CustomFileIO"; + FileIORegistry::Register( + custom_impl, + [](const std::string& /*warehouse*/, + const std::unordered_map& /*properties*/) + -> Result> { + return std::make_shared(); + }); + + auto config = RestCatalogProperties::default_properties(); + config + ->Set(RestCatalogProperties::kUri, + std::format("{}:{}", kLocalhostUri, kRestCatalogPort)) + .Set(RestCatalogProperties::kName, std::string(kCatalogName)) + .Set(RestCatalogProperties::kWarehouse, "/any/warehouse") + .Set(FileIOProperties::kImpl, custom_impl); + + auto catalog_result = RestCatalog::Make(*config); + ASSERT_THAT(catalog_result, IsOk()); + + auto& catalog = catalog_result.value(); + EXPECT_EQ(catalog->name(), kCatalogName); +} + } // namespace iceberg::rest