Skip to content

Commit 43a2050

Browse files
authored
Buffer OCI registry uploads to S3 (#752)
Buffer OCI upload network frames into configurable S3 chunks (`registry.data_store.chunk_size`, default 5 MB) to reduce S3 operations during Docker push.
1 parent f65ba05 commit 43a2050

25 files changed

Lines changed: 211 additions & 52 deletions

File tree

docker/bench.Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ COPY services/api/openapi.json /usr/src/bencher/services/api/openapi.json
8989

9090
WORKDIR /usr/src/bencher
9191
RUN cargo bench --package bencher_adapter --no-run --message-format=json \
92-
| jq -r 'select(.executable != null) | .executable' \
92+
| jq -r 'select(.executable != null and .target.kind == ["bench"]) | .executable' \
9393
| xargs -I {} cp {} /usr/local/bin/bench-adapter
9494

9595
FROM debian:bookworm-slim@sha256:b4aa902587c2e61ce789849cb54c332b0400fe27b1ee33af4669e1f7e7c3e22f

lib/bencher_json/src/system/config/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ pub use plus::{
2929
JsonOciBandwidth, JsonPublicRateLimiter, JsonRateLimiting, JsonRunnerRateLimiter,
3030
JsonUserRateLimiter,
3131
},
32-
registry::{DEFAULT_MAX_BODY_SIZE, DEFAULT_UPLOAD_TIMEOUT_SECS, RegistryDataStore},
32+
registry::{
33+
DEFAULT_CHUNK_SIZE, DEFAULT_MAX_BODY_SIZE, DEFAULT_UPLOAD_TIMEOUT_SECS, MAX_CHUNK_SIZE,
34+
RegistryDataStore,
35+
},
3336
runners::{DEFAULT_HEARTBEAT_TIMEOUT_SECS, DEFAULT_JOB_TIMEOUT_GRACE_PERIOD_SECS},
3437
stats::JsonStats,
3538
};

lib/bencher_json/src/system/config/plus/registry.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,26 @@ pub const DEFAULT_UPLOAD_TIMEOUT_SECS: u64 = 3600;
99
/// Default maximum body size: 1 GiB (1,073,741,824 bytes)
1010
pub const DEFAULT_MAX_BODY_SIZE: u64 = 0x4000_0000;
1111

12+
/// Default chunk size for S3 upload buffering: 5 MB (5,242,880 bytes).
13+
///
14+
/// HTTP request bodies arrive as small network frames (typically 8–64 KB).
15+
/// Without batching, each frame would be stored as a separate S3 object,
16+
/// creating thousands of objects per layer and making both upload
17+
/// (~3 S3 ops per frame) and completion (~1 S3 `GetObject` per chunk)
18+
/// extremely slow. This value controls the minimum batch size before
19+
/// flushing to S3.
20+
///
21+
/// 5 MB also matches the S3 multipart upload minimum part size,
22+
/// so chunks stored at this size can be efficiently assembled during
23+
/// upload completion.
24+
pub const DEFAULT_CHUNK_SIZE: u64 = 5 * 1024 * 1024;
25+
26+
/// Maximum chunk size for S3 upload buffering: 5 GB (5,368,709,120 bytes).
27+
///
28+
/// The S3 multipart upload maximum part size is 5 GiB.
29+
/// We use 5 GB as a practical upper bound.
30+
pub const MAX_CHUNK_SIZE: u64 = 5 * 1024 * 1024 * 1024;
31+
1232
/// Container registry configuration
1333
#[derive(Debug, Clone, Serialize, Deserialize)]
1434
#[cfg_attr(feature = "schema", derive(JsonSchema))]
@@ -59,6 +79,10 @@ pub enum RegistryDataStore {
5979
/// S3 Access Point ARN with optional path prefix
6080
/// Format: arn:aws:s3:<region>:<account-id>:accesspoint/<bucket>[/path]
6181
access_point: String,
82+
/// Minimum chunk size in bytes for buffering upload data before storing to S3.
83+
/// Valid range: 5 MB–5 GB. Defaults to 5 MB (5,242,880 bytes).
84+
/// See [`DEFAULT_CHUNK_SIZE`] and [`MAX_CHUNK_SIZE`].
85+
chunk_size: Option<u64>,
6286
},
6387
}
6488

lib/bencher_schema/src/model/project/report/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ impl QueryReport {
154154
job: new_run_job,
155155
} = new_run_report;
156156

157-
#[cfg(feature = "plus")]
157+
#[cfg(all(feature = "plus", not(feature = "otel")))]
158+
let _ = is_claimed;
159+
#[cfg(all(feature = "plus", feature = "otel"))]
158160
let priority = plan_kind.priority(is_claimed);
159161

160162
#[cfg(all(feature = "otel", feature = "plus"))]
@@ -333,7 +335,7 @@ impl QueryReport {
333335
json_settings,
334336
#[cfg(feature = "plus")]
335337
plan_kind,
336-
#[cfg(feature = "plus")]
338+
#[cfg(all(feature = "plus", feature = "otel"))]
337339
priority,
338340
#[cfg(feature = "plus")]
339341
query_project,
@@ -434,7 +436,7 @@ impl QueryReport {
434436
adapter: Adapter,
435437
settings: JsonReportSettings,
436438
#[cfg(feature = "plus")] plan_kind: PlanKind,
437-
#[cfg(feature = "plus")] priority: bencher_json::Priority,
439+
#[cfg(all(feature = "plus", feature = "otel"))] priority: bencher_json::Priority,
438440
#[cfg(feature = "plus")] query_project: &QueryProject,
439441
) -> Result<(), HttpError> {
440442
#[cfg(feature = "plus")]

lib/bencher_schema/src/model/runner/job.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ impl QueryJob {
147147
query_report.adapter,
148148
settings,
149149
plan_kind,
150+
#[cfg(feature = "otel")]
150151
self.priority,
151152
&query_project,
152153
)

plus/api_oci/src/blobs.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::auth::{check_oci_bandwidth, record_oci_bandwidth};
4242
use crate::auth::{
4343
require_pull_access, require_push_access, resolve_project, validate_push_access,
4444
};
45+
use crate::error::storage_error;
4546
use crate::response::{
4647
APPLICATION_OCTET_STREAM, DOCKER_CONTENT_DIGEST, DOCKER_UPLOAD_UUID, oci_cors_headers,
4748
};
@@ -131,7 +132,7 @@ pub async fn oci_blob_exists(
131132
let size = storage
132133
.get_blob_size(&project_uuid, &digest)
133134
.await
134-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
135+
.map_err(storage_error)?;
135136

136137
// Build response with OCI-compliant headers (no body for HEAD)
137138
let response = oci_cors_headers(
@@ -195,7 +196,7 @@ pub async fn oci_blob_get(
195196
let (blob_body, size) = storage
196197
.get_blob_stream(&project_uuid, &digest)
197198
.await
198-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
199+
.map_err(storage_error)?;
199200

200201
// Record bandwidth usage
201202
#[cfg(feature = "plus")]
@@ -260,7 +261,7 @@ pub async fn oci_blob_delete(
260261
let exists = storage
261262
.blob_exists(&project_uuid, &digest)
262263
.await
263-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
264+
.map_err(storage_error)?;
264265
if !exists {
265266
return Err(crate::error::into_http_error(OciError::BlobUnknown {
266267
digest: digest.to_string(),
@@ -271,7 +272,7 @@ pub async fn oci_blob_delete(
271272
storage
272273
.delete_blob(&project_uuid, &digest)
273274
.await
274-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
275+
.map_err(storage_error)?;
275276

276277
// OCI spec requires 202 Accepted for DELETE
277278
let response = oci_cors_headers(
@@ -387,7 +388,7 @@ pub async fn oci_upload_start(
387388
let upload_id = storage
388389
.start_upload(&project_uuid)
389390
.await
390-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
391+
.map_err(storage_error)?;
391392

392393
// Build 202 Accepted response
393394
let location = format!("/v2/{project_slug}/blobs/uploads/{upload_id}");
@@ -460,15 +461,15 @@ pub async fn oci_upload_monolithic(
460461
let upload_id = storage
461462
.start_upload(&project_uuid)
462463
.await
463-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
464+
.map_err(storage_error)?;
464465

465466
// Stream body to storage (storage enforces max_body_size incrementally)
466467
let result = async {
467468
let final_size = crate::uploads::stream_to_storage(body, storage, &upload_id, 0).await?;
468469
let digest = storage
469470
.complete_upload(&upload_id, &expected_digest)
470471
.await
471-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
472+
.map_err(storage_error)?;
472473
Ok::<(Digest, u64), HttpError>((digest, final_size))
473474
}
474475
.await;

plus/api_oci/src/error.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ use bencher_json::oci::{OCI_ERROR_SIZE_INVALID, OCI_ERROR_UNKNOWN, oci_error_bod
44
use bencher_oci_storage::OciError;
55
use dropshot::{ClientErrorStatusCode, ErrorStatusCode, HttpError};
66

7+
/// Converts an `OciStorageError` into an `HttpError` via `OciError`
8+
pub fn storage_error(error: bencher_oci_storage::OciStorageError) -> HttpError {
9+
into_http_error(OciError::from(error))
10+
}
11+
712
/// Converts an `OciError` into an `HttpError` with OCI-compliant JSON error body
813
#[expect(
914
clippy::needless_pass_by_value,

plus/api_oci/src/manifests.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::auth::{check_oci_bandwidth, record_oci_bandwidth};
2121
use crate::auth::{
2222
require_pull_access, require_push_access, resolve_project, validate_push_access,
2323
};
24+
use crate::error::storage_error;
2425
use crate::response::{DOCKER_CONTENT_DIGEST, OCI_SUBJECT, oci_cors_headers};
2526

2627
/// Parse a reference string, returning the correct OCI error code on failure.
@@ -63,10 +64,7 @@ async fn resolve_reference(
6364
) -> Result<Digest, HttpError> {
6465
match reference {
6566
Reference::Digest(d) => Ok(d.clone()),
66-
Reference::Tag(t) => storage
67-
.resolve_tag(name, t)
68-
.await
69-
.map_err(|e| crate::error::into_http_error(OciError::from(e))),
67+
Reference::Tag(t) => storage.resolve_tag(name, t).await.map_err(storage_error),
7068
}
7169
}
7270

@@ -127,7 +125,7 @@ pub async fn oci_manifest_exists(
127125
let manifest = storage
128126
.get_manifest_by_digest(&project_uuid, &digest)
129127
.await
130-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
128+
.map_err(storage_error)?;
131129

132130
// Determine content type from typed manifest
133131
let parsed = Manifest::from_bytes(&manifest).map_err(|e| {
@@ -188,7 +186,7 @@ pub async fn oci_manifest_get(
188186
let manifest = storage
189187
.get_manifest_by_digest(&project_uuid, &digest)
190188
.await
191-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
189+
.map_err(storage_error)?;
192190

193191
// Record bandwidth usage
194192
#[cfg(feature = "plus")]
@@ -314,7 +312,7 @@ pub async fn oci_manifest_put(
314312
&parsed_manifest,
315313
)
316314
.await
317-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
315+
.map_err(storage_error)?;
318316

319317
// Record bandwidth usage
320318
#[cfg(feature = "plus")]
@@ -396,7 +394,7 @@ async fn verify_referenced_blobs(
396394
let exists = storage
397395
.blob_exists(repository, &digest)
398396
.await
399-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
397+
.map_err(storage_error)?;
400398
if !exists {
401399
return Err(crate::error::into_http_error(
402400
OciError::ManifestBlobUnknown { digest: digest_str },
@@ -442,7 +440,7 @@ pub async fn oci_manifest_delete(
442440
Reference::Tag(tag) => storage
443441
.resolve_tag(&project_uuid, tag)
444442
.await
445-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?,
443+
.map_err(storage_error)?,
446444
};
447445

448446
match &reference {
@@ -451,14 +449,14 @@ pub async fn oci_manifest_delete(
451449
storage
452450
.delete_manifest(&project_uuid, digest)
453451
.await
454-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
452+
.map_err(storage_error)?;
455453
},
456454
Reference::Tag(tag) => {
457455
// Delete by tag - delete the tag link only (manifest may still exist)
458456
storage
459457
.delete_tag(&project_uuid, tag)
460458
.await
461-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
459+
.map_err(storage_error)?;
462460
},
463461
}
464462

plus/api_oci/src/referrers.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@
88
use bencher_endpoint::{CorsResponse, Endpoint, Get};
99
use bencher_json::ProjectResourceId;
1010
use bencher_json::oci::{OCI_IMAGE_INDEX_MEDIA_TYPE, OciImageIndex, OciManifestDescriptor};
11-
use bencher_oci_storage::{Digest, OciError};
11+
use bencher_oci_storage::Digest;
1212
use bencher_schema::context::ApiContext;
1313
use dropshot::{Body, HttpError, Path, Query, RequestContext, endpoint};
1414
use http::Response;
1515
use schemars::JsonSchema;
1616
use serde::Deserialize;
1717

1818
use crate::auth::{require_pull_access, resolve_project};
19+
use crate::error::storage_error;
1920
use crate::response::{OCI_FILTERS_APPLIED, oci_cors_headers};
2021

2122
/// Path parameters for referrers endpoint
@@ -85,7 +86,7 @@ pub async fn oci_referrers_list(
8586
let referrers = storage
8687
.list_referrers(&project_uuid, &digest, query.artifact_type.as_deref())
8788
.await
88-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
89+
.map_err(storage_error)?;
8990

9091
// Build an OCI image index response
9192
// Per spec: returns application/vnd.oci.image.index.v1+json

plus/api_oci/src/tags.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use schemars::JsonSchema;
1212
use serde::{Deserialize, Serialize};
1313

1414
use crate::auth::{require_pull_access, resolve_project};
15+
use crate::error::storage_error;
1516
use crate::response::{APPLICATION_JSON, oci_cors_headers};
1617

1718
/// Path parameters for tags list
@@ -106,7 +107,7 @@ pub async fn oci_tags_list(
106107
let result = storage
107108
.list_tags(&project_uuid, Some(page_size), last_tag)
108109
.await
109-
.map_err(|e| crate::error::into_http_error(OciError::from(e)))?;
110+
.map_err(storage_error)?;
110111

111112
// Record metric
112113
#[cfg(feature = "otel")]

0 commit comments

Comments
 (0)