diff --git a/Cargo.lock b/Cargo.lock index ef7c147..e9af7be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2153,7 +2153,9 @@ dependencies = [ "aws-credential-types", "aws-sdk-s3", "aws-sigv4", + "aws-smithy-runtime-api", "aws-smithy-types", + "bytes", "futures", "hex", "http 1.4.0", diff --git a/Cargo.toml b/Cargo.toml index 4951b9b..7b77ece 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ aws-sdk-s3 = "1.119" aws-config = { version = "1.8", features = ["behavior-version-latest"] } aws-credential-types = "1.2" aws-smithy-types = "1.3" +aws-smithy-runtime-api = "1.9" # CLI clap = { version = "4.5", features = ["derive", "env"] } @@ -54,6 +55,7 @@ console = "0.16" dirs = "6.0" jiff = { version = "0.2", features = ["serde"] } humansize = "2.1" +bytes = "1.11" url = "2.5" futures = "0.3" async-trait = "0.1" diff --git a/crates/s3/Cargo.toml b/crates/s3/Cargo.toml index 4adc890..0b56f40 100644 --- a/crates/s3/Cargo.toml +++ b/crates/s3/Cargo.toml @@ -20,6 +20,7 @@ aws-sdk-s3.workspace = true aws-config.workspace = true aws-credential-types.workspace = true aws-smithy-types.workspace = true +aws-smithy-runtime-api.workspace = true # Async tokio.workspace = true @@ -35,6 +36,7 @@ tracing.workspace = true # Utilities jiff.workspace = true +bytes.workspace = true url.workspace = true # HTTP client for Admin API diff --git a/crates/s3/src/client.rs b/crates/s3/src/client.rs index 707a895..22944d3 100644 --- a/crates/s3/src/client.rs +++ b/crates/s3/src/client.rs @@ -3,13 +3,153 @@ //! Wraps aws-sdk-s3 and implements the ObjectStore trait from rc-core. use async_trait::async_trait; - +use aws_smithy_runtime_api::client::http::{ + HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector, +}; +use aws_smithy_runtime_api::client::orchestrator::HttpRequest; +use aws_smithy_runtime_api::client::result::ConnectorError; +use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents; +use aws_smithy_runtime_api::http::{Response, StatusCode}; +use aws_smithy_types::body::SdkBody; +use bytes::Bytes; use jiff::Timestamp; use rc_core::{ Alias, Capabilities, Error, ListOptions, ListResult, ObjectInfo, ObjectStore, ObjectVersion, RemotePath, Result, }; +/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification) +/// and custom CA bundles. Used when `alias.insecure = true` or `alias.ca_bundle.is_some()`. +#[derive(Debug, Clone)] +struct ReqwestConnector { + client: reqwest::Client, +} + +impl ReqwestConnector { + async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result { + // NOTE: When `insecure = true`, `danger_accept_invalid_certs` disables all TLS + // certificate verification. Any CA bundle provided will still be added to the + // trust store but is rendered ineffective for this connection. + let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure); + + if let Some(bundle_path) = ca_bundle { + // Use tokio::fs::read to avoid blocking the async runtime thread. + let pem = tokio::fs::read(bundle_path).await.map_err(|e| { + Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}")) + })?; + let cert = reqwest::Certificate::from_pem(&pem) + .map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?; + builder = builder.add_root_certificate(cert); + } + + let client = builder + .build() + .map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?; + Ok(Self { client }) + } +} + +impl HttpConnector for ReqwestConnector { + fn call(&self, request: HttpRequest) -> HttpConnectorFuture { + let client = self.client.clone(); + HttpConnectorFuture::new(async move { + // Extract request parts before consuming the request + let uri = request.uri().to_string(); + let method_str = request.method().to_string(); + let headers = request.headers().clone(); + + // Try to get the body as buffered in-memory bytes. + // For streaming bodies (e.g., large file uploads), bytes() returns None and we + // return a clear error rather than silently sending an empty body, which would + // cause signature mismatches or server-side failures. + let body_bytes = match request.body().bytes() { + Some(b) => Bytes::copy_from_slice(b), + None => { + return Err(ConnectorError::user( + "Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \ + use in-memory data for uploads with this connector" + .into(), + )); + } + }; + + // Build reqwest method + let method = reqwest::Method::from_bytes(method_str.as_bytes()) + .map_err(|e| ConnectorError::user(Box::new(e)))?; + + // Build reqwest URL + let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?; + + // Build reqwest request + let mut req = reqwest::Request::new(method, url); + + // Copy headers; S3 headers are all ASCII so failures here are unexpected + for (name, value) in headers.iter() { + match ( + reqwest::header::HeaderName::from_bytes(name.as_bytes()), + reqwest::header::HeaderValue::from_bytes(value.as_bytes()), + ) { + (Ok(header_name), Ok(header_value)) => { + req.headers_mut().append(header_name, header_value); + } + _ => { + tracing::warn!("Skipping non-convertible request header: {}", name); + } + } + } + + // Set body + *req.body_mut() = Some(reqwest::Body::from(body_bytes)); + + // Execute + let resp = client + .execute(req) + .await + .map_err(|e| ConnectorError::io(Box::new(e)))?; + + // Convert response + let status = StatusCode::try_from(resp.status().as_u16()) + .map_err(|e| ConnectorError::other(Box::new(e), None))?; + let resp_headers = resp.headers().clone(); + let body = resp + .bytes() + .await + .map_err(|e| ConnectorError::io(Box::new(e)))?; + + let mut sdk_response = Response::new(status, SdkBody::from(body)); + for (name, value) in &resp_headers { + match value.to_str() { + Ok(value_str) => { + sdk_response + .headers_mut() + .append(name.as_str().to_owned(), value_str.to_owned()); + } + Err(_) => { + tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str()); + } + } + } + + Ok(sdk_response) + }) + } +} + +impl HttpClient for ReqwestConnector { + fn http_connector( + &self, + _settings: &HttpConnectorSettings, + _components: &RuntimeComponents, + ) -> SharedHttpConnector { + // NOTE: `ReqwestConnector` is preconfigured (e.g., insecure/CA-bundle options) when it + // is constructed, and does not currently apply `HttpConnectorSettings`. This means + // behavior in this mode may differ from the default connector w.r.t. SDK HTTP settings. + // If alignment is required, map relevant fields from `HttpConnectorSettings` onto the + // internal `reqwest::Client` when constructing the connector. + SharedHttpConnector::new(self.clone()) + } +} + /// S3 client wrapper pub struct S3Client { inner: aws_sdk_s3::Client, @@ -34,13 +174,21 @@ impl S3Client { "rc-static-credentials", ); - // Build SDK config - let config = aws_config::defaults(aws_config::BehaviorVersion::latest()) + // Build SDK config loader + let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest()) .credentials_provider(credentials) .region(aws_config::Region::new(region)) - .endpoint_url(&endpoint) - .load() - .await; + .endpoint_url(&endpoint); + + // When insecure mode is enabled or a custom CA bundle is provided, use the reqwest + // connector which supports danger_accept_invalid_certs and custom root certificates. + if alias.insecure || alias.ca_bundle.is_some() { + let connector = + ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?; + config_loader = config_loader.http_client(connector); + } + + let config = config_loader.load().await; // Build S3 client with path-style addressing for compatibility let s3_config = aws_sdk_s3::config::Builder::from(&config) @@ -736,4 +884,29 @@ mod tests { assert_eq!(info.key, "test.txt"); assert_eq!(info.size_bytes, Some(1024)); } + + #[tokio::test] + async fn reqwest_connector_insecure_without_ca_bundle_succeeds() { + // When insecure is true and no CA bundle is provided, the connector should be created. + let connector = ReqwestConnector::new(true, None).await; + assert!( + connector.is_ok(), + "Expected insecure connector creation to succeed" + ); + } + + #[tokio::test] + async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() { + // Use an obviously invalid path (empty string) to trigger a read error. + let result = ReqwestConnector::new(false, Some("")).await; + match result { + Err(Error::Network(msg)) => { + assert!( + msg.contains("Failed to read CA bundle"), + "Unexpected error message: {msg}" + ); + } + other => panic!("Expected Error::Network for invalid path, got: {:?}", other), + } + } }