diff --git a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs index 19652005e1b..7ebd1fdb32e 100644 --- a/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs +++ b/quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs @@ -323,6 +323,15 @@ impl S3CompatibleObjectStorage { payload: Box, len: u64, ) -> Result<(), Retry> { + // For MD5 uploads, compute Content-MD5 before streaming the body. + // The AWS SDK no-ops ChecksumAlgorithm::Md5, so MD5 must be sent via + // the legacy Content-MD5 header (same as the multipart path does per part). + let content_md5: Option = self + .maybe_compute_part_md5(payload.as_ref(), 0..len) + .await + .map_err(|err| Retry::Permanent(StorageError::from(err)))? + .map(|digest| BASE64_STANDARD.encode(digest.0)); + let body = payload .byte_stream() .await @@ -338,6 +347,7 @@ impl S3CompatibleObjectStorage { .body(body) .content_length(len as i64) .set_checksum_algorithm(aws_checksum_algorithm(self.checksum_algorithm)) + .set_content_md5(content_md5) .send() .await .map_err(|sdk_error| { @@ -1305,4 +1315,77 @@ mod tests { .await .unwrap(); } + + fn make_s3_storage_with_replay( + client: StaticReplayClient, + checksum_algorithm: quickwit_config::ChecksumAlgorithm, + ) -> S3CompatibleObjectStorage { + let credentials = Credentials::new("mock_key", "mock_secret", None, None, "mock_provider"); + let config = aws_sdk_s3::Config::builder() + .behavior_version(aws_behavior_version()) + .region(Some(Region::new("us-east-1"))) + .http_client(client) + .credentials_provider(credentials) + .build(); + S3CompatibleObjectStorage { + s3_client: S3Client::from_conf(config), + uri: Uri::for_test("s3://bucket/"), + bucket: "bucket".to_string(), + prefix: PathBuf::new(), + multipart_policy: MultiPartPolicy::default(), + retry_params: RetryParams::for_test(), + disable_multi_object_delete: false, + disable_multipart_upload: true, + checksum_algorithm, + } + } + + fn ok_put_response() -> ReplayEvent { + ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(200) + .body(SdkBody::empty()) + .unwrap(), + ) + } + + #[tokio::test] + async fn test_single_part_upload_sets_content_md5_for_md5_algorithm() { + let client = StaticReplayClient::new(vec![ok_put_response()]); + let s3_storage = + make_s3_storage_with_replay(client.clone(), quickwit_config::ChecksumAlgorithm::Md5); + let payload: Vec = b"hello world".to_vec(); + s3_storage + .put(Path::new("test-key"), Box::new(payload.clone())) + .await + .unwrap(); + + let requests = client.actual_requests().collect::>(); + assert_eq!(requests.len(), 1); + let content_md5 = requests[0] + .headers() + .get("content-md5") + .expect("Content-MD5 header must be present for md5 checksum algorithm"); + let expected = BASE64_STANDARD.encode(md5::compute(&payload).0); + assert_eq!(content_md5, expected.as_str()); + } + + #[tokio::test] + async fn test_single_part_upload_no_content_md5_for_crc32c_algorithm() { + let client = StaticReplayClient::new(vec![ok_put_response()]); + let s3_storage = + make_s3_storage_with_replay(client.clone(), quickwit_config::ChecksumAlgorithm::Crc32c); + s3_storage + .put(Path::new("test-key"), Box::new(b"hello world".to_vec())) + .await + .unwrap(); + + let requests = client.actual_requests().collect::>(); + assert_eq!(requests.len(), 1); + assert!( + requests[0].headers().get("content-md5").is_none(), + "Content-MD5 header must not be set for crc32c checksum algorithm" + ); + } }