Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ aws-sdk-s3 = "1.119"
aws-config = { version = "1.8", features = ["behavior-version-latest"] }
aws-credential-types = "1.2"
aws-smithy-types = "1.3"
aws-smithy-runtime-api = "1.9"

# CLI
clap = { version = "4.5", features = ["derive", "env"] }
Expand Down Expand Up @@ -54,6 +55,7 @@ console = "0.16"
dirs = "6.0"
jiff = { version = "0.2", features = ["serde"] }
humansize = "2.1"
bytes = "1.11"
url = "2.5"
futures = "0.3"
async-trait = "0.1"
Expand Down
2 changes: 2 additions & 0 deletions crates/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-sdk-s3.workspace = true
aws-config.workspace = true
aws-credential-types.workspace = true
aws-smithy-types.workspace = true
aws-smithy-runtime-api.workspace = true

# Async
tokio.workspace = true
Expand All @@ -35,6 +36,7 @@ tracing.workspace = true

# Utilities
jiff.workspace = true
bytes.workspace = true
url.workspace = true

# HTTP client for Admin API
Expand Down
185 changes: 179 additions & 6 deletions crates/s3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,153 @@
//! Wraps aws-sdk-s3 and implements the ObjectStore trait from rc-core.

use async_trait::async_trait;

use aws_smithy_runtime_api::client::http::{
HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
};
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::result::ConnectorError;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::http::{Response, StatusCode};
use aws_smithy_types::body::SdkBody;
use bytes::Bytes;
use jiff::Timestamp;
use rc_core::{
Alias, Capabilities, Error, ListOptions, ListResult, ObjectInfo, ObjectStore, ObjectVersion,
RemotePath, Result,
};

/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification)
/// and custom CA bundles. Used when `alias.insecure = true` or `alias.ca_bundle.is_some()`.
#[derive(Debug, Clone)]
struct ReqwestConnector {
client: reqwest::Client,
}

impl ReqwestConnector {
async fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
// NOTE: When `insecure = true`, `danger_accept_invalid_certs` disables all TLS
// certificate verification. Any CA bundle provided will still be added to the
// trust store but is rendered ineffective for this connection.
let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);

if let Some(bundle_path) = ca_bundle {
// Use tokio::fs::read to avoid blocking the async runtime thread.
let pem = tokio::fs::read(bundle_path).await.map_err(|e| {
Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
})?;
let cert = reqwest::Certificate::from_pem(&pem)
.map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
builder = builder.add_root_certificate(cert);
}

let client = builder
.build()
.map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
Ok(Self { client })
}
}

impl HttpConnector for ReqwestConnector {
fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
let client = self.client.clone();
HttpConnectorFuture::new(async move {
// Extract request parts before consuming the request
let uri = request.uri().to_string();
let method_str = request.method().to_string();
let headers = request.headers().clone();

// Try to get the body as buffered in-memory bytes.
// For streaming bodies (e.g., large file uploads), bytes() returns None and we
// return a clear error rather than silently sending an empty body, which would
// cause signature mismatches or server-side failures.
let body_bytes = match request.body().bytes() {
Some(b) => Bytes::copy_from_slice(b),
None => {
return Err(ConnectorError::user(
"Streaming request bodies are not supported in insecure/ca_bundle TLS mode; \
use in-memory data for uploads with this connector"
.into(),
));
}
};

// Build reqwest method
let method = reqwest::Method::from_bytes(method_str.as_bytes())
.map_err(|e| ConnectorError::user(Box::new(e)))?;

// Build reqwest URL
let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;

// Build reqwest request
let mut req = reqwest::Request::new(method, url);

// Copy headers; S3 headers are all ASCII so failures here are unexpected
for (name, value) in headers.iter() {
match (
reqwest::header::HeaderName::from_bytes(name.as_bytes()),
reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
) {
(Ok(header_name), Ok(header_value)) => {
req.headers_mut().append(header_name, header_value);
}
_ => {
tracing::warn!("Skipping non-convertible request header: {}", name);
}
}
}

// Set body
*req.body_mut() = Some(reqwest::Body::from(body_bytes));

// Execute
let resp = client
.execute(req)
.await
.map_err(|e| ConnectorError::io(Box::new(e)))?;

// Convert response
let status = StatusCode::try_from(resp.status().as_u16())
.map_err(|e| ConnectorError::other(Box::new(e), None))?;
let resp_headers = resp.headers().clone();
let body = resp
.bytes()
.await
.map_err(|e| ConnectorError::io(Box::new(e)))?;

let mut sdk_response = Response::new(status, SdkBody::from(body));
for (name, value) in &resp_headers {
match value.to_str() {
Ok(value_str) => {
sdk_response
.headers_mut()
.append(name.as_str().to_owned(), value_str.to_owned());
}
Err(_) => {
tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
}
}
}

Ok(sdk_response)
})
}
}

impl HttpClient for ReqwestConnector {
fn http_connector(
&self,
_settings: &HttpConnectorSettings,
_components: &RuntimeComponents,
) -> SharedHttpConnector {
// NOTE: `ReqwestConnector` is preconfigured (e.g., insecure/CA-bundle options) when it
// is constructed, and does not currently apply `HttpConnectorSettings`. This means
// behavior in this mode may differ from the default connector w.r.t. SDK HTTP settings.
// If alignment is required, map relevant fields from `HttpConnectorSettings` onto the
// internal `reqwest::Client` when constructing the connector.
SharedHttpConnector::new(self.clone())
}
}

/// S3 client wrapper
pub struct S3Client {
inner: aws_sdk_s3::Client,
Expand All @@ -34,13 +174,21 @@ impl S3Client {
"rc-static-credentials",
);

// Build SDK config
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
// Build SDK config loader
let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
.credentials_provider(credentials)
.region(aws_config::Region::new(region))
.endpoint_url(&endpoint)
.load()
.await;
.endpoint_url(&endpoint);

// When insecure mode is enabled or a custom CA bundle is provided, use the reqwest
// connector which supports danger_accept_invalid_certs and custom root certificates.
if alias.insecure || alias.ca_bundle.is_some() {
let connector =
ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref()).await?;
config_loader = config_loader.http_client(connector);
}

let config = config_loader.load().await;

// Build S3 client with path-style addressing for compatibility
let s3_config = aws_sdk_s3::config::Builder::from(&config)
Expand Down Expand Up @@ -736,4 +884,29 @@ mod tests {
assert_eq!(info.key, "test.txt");
assert_eq!(info.size_bytes, Some(1024));
}

#[tokio::test]
async fn reqwest_connector_insecure_without_ca_bundle_succeeds() {
// When insecure is true and no CA bundle is provided, the connector should be created.
let connector = ReqwestConnector::new(true, None).await;
assert!(
connector.is_ok(),
"Expected insecure connector creation to succeed"
);
}

#[tokio::test]
async fn reqwest_connector_invalid_ca_bundle_path_surfaces_error() {
// Use an obviously invalid path (empty string) to trigger a read error.
let result = ReqwestConnector::new(false, Some("")).await;
match result {
Err(Error::Network(msg)) => {
assert!(
msg.contains("Failed to read CA bundle"),
"Unexpected error message: {msg}"
);
}
other => panic!("Expected Error::Network for invalid path, got: {:?}", other),
}
}
}