Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,15 @@ impl S3CompatibleObjectStorage {
payload: Box<dyn crate::PutPayload>,
len: u64,
) -> Result<(), Retry<StorageError>> {
// For MD5 uploads, compute Content-MD5 before streaming the body.
// The AWS SDK no-ops ChecksumAlgorithm::Md5, so MD5 must be sent via
// the legacy Content-MD5 header (same as the multipart path does per part).
let content_md5: Option<String> = self
.maybe_compute_part_md5(payload.as_ref(), 0..len)
.await
.map_err(|err| Retry::Permanent(StorageError::from(err)))?
.map(|digest| BASE64_STANDARD.encode(digest.0));

let body = payload
.byte_stream()
.await
Expand All @@ -338,6 +347,7 @@ impl S3CompatibleObjectStorage {
.body(body)
.content_length(len as i64)
.set_checksum_algorithm(aws_checksum_algorithm(self.checksum_algorithm))
.set_content_md5(content_md5)
.send()
.await
.map_err(|sdk_error| {
Expand Down Expand Up @@ -1305,4 +1315,77 @@ mod tests {
.await
.unwrap();
}

fn make_s3_storage_with_replay(
client: StaticReplayClient,
checksum_algorithm: quickwit_config::ChecksumAlgorithm,
) -> S3CompatibleObjectStorage {
let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock_provider");
let config = aws_sdk_s3::Config::builder()
.behavior_version(aws_behavior_version())
.region(Some(Region::new("us-east-1")))
.http_client(client)
.credentials_provider(credentials)
.build();
S3CompatibleObjectStorage {
s3_client: S3Client::from_conf(config),
uri: Uri::for_test("s3://bucket/"),
bucket: "bucket".to_string(),
prefix: PathBuf::new(),
multipart_policy: MultiPartPolicy::default(),
retry_params: RetryParams::for_test(),
disable_multi_object_delete: false,
disable_multipart_upload: true,
checksum_algorithm,
}
}

fn ok_put_response() -> ReplayEvent {
ReplayEvent::new(
http::Request::builder().body(SdkBody::empty()).unwrap(),
http::Response::builder()
.status(200)
.body(SdkBody::empty())
.unwrap(),
)
}

#[tokio::test]
async fn test_single_part_upload_sets_content_md5_for_md5_algorithm() {
let client = StaticReplayClient::new(vec![ok_put_response()]);
let s3_storage =
make_s3_storage_with_replay(client.clone(), quickwit_config::ChecksumAlgorithm::Md5);
let payload: Vec<u8> = b"hello world".to_vec();
s3_storage
.put(Path::new("test-key"), Box::new(payload.clone()))
.await
.unwrap();

let requests = client.actual_requests().collect::<Vec<_>>();
assert_eq!(requests.len(), 1);
let content_md5 = requests[0]
.headers()
.get("content-md5")
.expect("Content-MD5 header must be present for md5 checksum algorithm");
let expected = BASE64_STANDARD.encode(md5::compute(&payload).0);
assert_eq!(content_md5, expected.as_str());
}

#[tokio::test]
async fn test_single_part_upload_no_content_md5_for_crc32c_algorithm() {
let client = StaticReplayClient::new(vec![ok_put_response()]);
let s3_storage =
make_s3_storage_with_replay(client.clone(), quickwit_config::ChecksumAlgorithm::Crc32c);
s3_storage
.put(Path::new("test-key"), Box::new(b"hello world".to_vec()))
.await
.unwrap();

let requests = client.actual_requests().collect::<Vec<_>>();
assert_eq!(requests.len(), 1);
assert!(
requests[0].headers().get("content-md5").is_none(),
"Content-MD5 header must not be set for crc32c checksum algorithm"
);
}
}
Loading