Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ aws-sdk-s3 = "1.119"
aws-config = { version = "1.8", features = ["behavior-version-latest"] }
aws-credential-types = "1.2"
aws-smithy-types = "1.3"
aws-smithy-runtime-api = "1.9"

# CLI
clap = { version = "4.5", features = ["derive", "env"] }
Expand Down Expand Up @@ -54,6 +55,7 @@ console = "0.16"
dirs = "6.0"
jiff = { version = "0.2", features = ["serde"] }
humansize = "2.1"
bytes = "1.11"
url = "2.5"
futures = "0.3"
async-trait = "0.1"
Expand Down
2 changes: 2 additions & 0 deletions crates/s3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ aws-sdk-s3.workspace = true
aws-config.workspace = true
aws-credential-types.workspace = true
aws-smithy-types.workspace = true
aws-smithy-runtime-api.workspace = true

# Async
tokio.workspace = true
Expand All @@ -35,6 +36,7 @@ tracing.workspace = true

# Utilities
jiff.workspace = true
bytes.workspace = true
url.workspace = true

# HTTP client for Admin API
Expand Down
143 changes: 137 additions & 6 deletions crates/s3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,137 @@
//! Wraps aws-sdk-s3 and implements the ObjectStore trait from rc-core.

use async_trait::async_trait;

use aws_smithy_runtime_api::client::http::{
HttpClient, HttpConnector, HttpConnectorFuture, HttpConnectorSettings, SharedHttpConnector,
};
use aws_smithy_runtime_api::client::orchestrator::HttpRequest;
use aws_smithy_runtime_api::client::result::ConnectorError;
use aws_smithy_runtime_api::client::runtime_components::RuntimeComponents;
use aws_smithy_runtime_api::http::{Response, StatusCode};
use aws_smithy_types::body::SdkBody;
use bytes::Bytes;
use jiff::Timestamp;
use rc_core::{
Alias, Capabilities, Error, ListOptions, ListResult, ObjectInfo, ObjectStore, ObjectVersion,
RemotePath, Result,
};

/// Custom HTTP connector using reqwest, supporting insecure TLS (skip cert verification)
/// and custom CA bundles. Used when `alias.insecure = true`.
Comment thread
overtrue marked this conversation as resolved.
Outdated
#[derive(Debug, Clone)]
struct ReqwestConnector {
client: reqwest::Client,
}

impl ReqwestConnector {
fn new(insecure: bool, ca_bundle: Option<&str>) -> Result<Self> {
// NOTE: When `insecure = true`, `danger_accept_invalid_certs` disables all TLS
// certificate verification. Any CA bundle provided will still be added to the
// trust store but is rendered ineffective for this connection.
let mut builder = reqwest::Client::builder().danger_accept_invalid_certs(insecure);

if let Some(bundle_path) = ca_bundle {
let pem = std::fs::read(bundle_path).map_err(|e| {
Error::Network(format!("Failed to read CA bundle '{bundle_path}': {e}"))
})?;
let cert = reqwest::Certificate::from_pem(&pem)
.map_err(|e| Error::Network(format!("Invalid CA bundle '{bundle_path}': {e}")))?;
builder = builder.add_root_certificate(cert);
Comment on lines +35 to +42
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReqwestConnector::new reads the CA bundle using std::fs::read, which is a blocking filesystem call. Since this is invoked from async code (S3Client::new), consider switching to tokio::fs::read or spawn_blocking to avoid blocking the async runtime thread, especially if aliases are created on-demand.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ae82923. ReqwestConnector::new is now async fn and uses tokio::fs::read(...).await instead of the blocking std::fs::read. The call site in S3Client::new has been updated to .await? accordingly.

}

let client = builder
.build()
.map_err(|e| Error::Network(format!("Failed to build HTTP client: {e}")))?;
Ok(Self { client })
}
}

Comment thread
overtrue marked this conversation as resolved.
impl HttpConnector for ReqwestConnector {
fn call(&self, request: HttpRequest) -> HttpConnectorFuture {
let client = self.client.clone();
HttpConnectorFuture::new(async move {
// Extract request parts before consuming the request
let uri = request.uri().to_string();
let method_str = request.method().to_string();
let headers = request.headers().clone();
let body_bytes = request
.body()
.bytes()
.map(Bytes::copy_from_slice)
.unwrap_or_default();
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request.body().bytes().unwrap_or_default() silently turns any non-buffered/streaming request body into an empty body. That can break operations that use streaming bodies (e.g., future uploads/multipart) and can lead to signature mismatches or server-side errors. Prefer handling the streaming case explicitly (buffer the body, or return a ConnectorError indicating unsupported streaming) rather than defaulting to empty.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot apply changes based on this feedback

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ae82923. request.body().bytes().unwrap_or_default() is replaced with a match that returns a clear ConnectorError::user(...) when bytes() is None (streaming body), rather than silently sending an empty body.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was already addressed in ae82923. The unwrap_or_default() is replaced with a match that returns a clear ConnectorError::user(...) when bytes() is None (streaming body).


// Build reqwest method
let method = reqwest::Method::from_bytes(method_str.as_bytes())
.map_err(|e| ConnectorError::user(Box::new(e)))?;

// Build reqwest URL
let url = reqwest::Url::parse(&uri).map_err(|e| ConnectorError::user(Box::new(e)))?;

// Build reqwest request
let mut req = reqwest::Request::new(method, url);

// Copy headers; S3 headers are all ASCII so failures here are unexpected
for (name, value) in headers.iter() {
match (
reqwest::header::HeaderName::from_bytes(name.as_bytes()),
reqwest::header::HeaderValue::from_bytes(value.as_bytes()),
) {
(Ok(header_name), Ok(header_value)) => {
req.headers_mut().append(header_name, header_value);
}
_ => {
tracing::warn!("Skipping non-convertible request header: {}", name);
}
}
}

// Set body
*req.body_mut() = Some(reqwest::Body::from(body_bytes));

// Execute
let resp = client
.execute(req)
.await
.map_err(|e| ConnectorError::io(Box::new(e)))?;

// Convert response
let status = StatusCode::try_from(resp.status().as_u16())
.map_err(|e| ConnectorError::other(Box::new(e), None))?;
let resp_headers = resp.headers().clone();
let body = resp
.bytes()
.await
.map_err(|e| ConnectorError::io(Box::new(e)))?;

let mut sdk_response = Response::new(status, SdkBody::from(body));
for (name, value) in &resp_headers {
match value.to_str() {
Ok(value_str) => {
sdk_response
.headers_mut()
.append(name.as_str().to_owned(), value_str.to_owned());
}
Err(_) => {
tracing::warn!("Skipping non-UTF8 response header: {}", name.as_str());
}
}
}

Ok(sdk_response)
})
}
}

impl HttpClient for ReqwestConnector {
fn http_connector(
&self,
_settings: &HttpConnectorSettings,
_components: &RuntimeComponents,
) -> SharedHttpConnector {
Comment thread
overtrue marked this conversation as resolved.
SharedHttpConnector::new(self.clone())
}
}

/// S3 client wrapper
pub struct S3Client {
inner: aws_sdk_s3::Client,
Expand All @@ -34,13 +158,20 @@ impl S3Client {
"rc-static-credentials",
);

// Build SDK config
let config = aws_config::defaults(aws_config::BehaviorVersion::latest())
// Build SDK config loader
let mut config_loader = aws_config::defaults(aws_config::BehaviorVersion::latest())
.credentials_provider(credentials)
.region(aws_config::Region::new(region))
.endpoint_url(&endpoint)
.load()
.await;
.endpoint_url(&endpoint);

// When insecure mode is enabled or a custom CA bundle is provided, use the reqwest
// connector which supports danger_accept_invalid_certs and custom root certificates.
if alias.insecure || alias.ca_bundle.is_some() {
let connector = ReqwestConnector::new(alias.insecure, alias.ca_bundle.as_deref())?;
config_loader = config_loader.http_client(connector);
}

let config = config_loader.load().await;

// Build S3 client with path-style addressing for compatibility
let s3_config = aws_sdk_s3::config::Builder::from(&config)
Expand Down