Skip to content
This repository was archived by the owner on Jul 28, 2025. It is now read-only.

Commit f143b25

Browse files
committed
api: Update 'missing_content' to give upload endpoint(s)
1 parent e3499ff commit f143b25

7 files changed

Lines changed: 139 additions & 62 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/api/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,4 @@ warg-crypto = { workspace = true }
1111
serde = { workspace = true }
1212
serde_with = { workspace = true }
1313
thiserror = { workspace = true }
14+
itertools = { workspace = true }

crates/api/src/v1/package.rs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,26 @@ pub enum ContentSource {
2121
},
2222
}
2323

24+
/// Represents the supported kinds of content upload endpoints.
25+
#[derive(Clone, Debug, Serialize, Deserialize)]
26+
#[serde(tag = "type", rename_all = "camelCase")]
27+
pub enum UploadEndpoint {
28+
/// Content may be uploaded via HTTP POST to the given URL.
29+
/// If the endpoint responds with "201 Created" and a Location header, that
30+
/// header's value will be the content source.
31+
HttpPost {
32+
/// The URL to POST content to.
33+
url: String,
34+
},
35+
}
36+
37+
/// Information about missing content.
38+
#[derive(Clone, Debug, Serialize, Deserialize)]
39+
pub struct MissingContent {
40+
/// Upload endpoint(s) that may be used to provide missing content.
41+
pub upload: Vec<UploadEndpoint>,
42+
}
43+
2444
/// Represents a request to publish a record to a package log.
2545
#[derive(Serialize, Deserialize)]
2646
#[serde(rename = "camelCase")]
@@ -45,13 +65,13 @@ pub struct PackageRecord {
4565
}
4666

4767
impl PackageRecord {
48-
/// Gets the missing content digests of the record.
49-
pub fn missing_content(&self) -> &[AnyHash] {
68+
/// Gets the missing content of the record.
69+
pub fn missing_content(&self) -> impl Iterator<Item = (&AnyHash, &MissingContent)> {
5070
match &self.state {
5171
PackageRecordState::Sourcing {
5272
missing_content, ..
53-
} => missing_content,
54-
_ => &[],
73+
} => itertools::Either::Left(missing_content.iter()),
74+
_ => itertools::Either::Right(std::iter::empty()),
5575
}
5676
}
5777
}
@@ -69,7 +89,7 @@ pub enum PackageRecordState {
6989
#[serde(rename_all = "camelCase")]
7090
Sourcing {
7191
/// The digests of the missing content.
72-
missing_content: Vec<AnyHash>,
92+
missing_content: HashMap<AnyHash, MissingContent>,
7393
},
7494
/// The package record is processing.
7595
#[serde(rename_all = "camelCase")]

crates/client/src/api.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -301,15 +301,12 @@ impl Client {
301301
/// Uploads package content to the registry.
302302
pub async fn upload_content(
303303
&self,
304-
log_id: &LogId,
305-
record_id: &RecordId,
306-
digest: &AnyHash,
304+
url: &str,
307305
content: impl Into<Body>,
308306
) -> Result<String, ClientError> {
309-
let url = self
310-
.url
311-
.join(&paths::package_record_content(log_id, record_id, digest))
312-
.unwrap();
307+
// Upload URLs may be relative to the registry URL.
308+
let url = self.url.join(url);
309+
313310
tracing::debug!("uploading content to `{url}`");
314311

315312
let response = self.client.post(url).body(content).send().await?;

crates/client/src/lib.rs

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ use storage::{
1313
use thiserror::Error;
1414
use warg_api::v1::{
1515
fetch::{FetchError, FetchLogsRequest, FetchLogsResponse},
16-
package::{PackageError, PackageRecord, PackageRecordState, PublishRecordRequest},
16+
package::{
17+
MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
18+
UploadEndpoint,
19+
},
1720
proof::{ConsistencyRequest, InclusionRequest},
1821
};
1922
use warg_crypto::{
@@ -29,8 +32,8 @@ use warg_protocol::{
2932
pub mod api;
3033
mod config;
3134
pub mod lock;
32-
pub mod storage;
3335
mod registry_url;
36+
pub mod storage;
3437
pub use self::config::*;
3538
pub use self::registry_url::RegistryUrl;
3639

@@ -156,34 +159,33 @@ impl<R: RegistryStorage, C: ContentStorage> Client<R, C> {
156159
})
157160
})?;
158161

159-
let missing = record.missing_content();
160-
if !missing.is_empty() {
161-
// Upload the missing content
162-
// TODO: parallelize this
163-
for digest in record.missing_content() {
164-
self.api
165-
.upload_content(
166-
&log_id,
167-
&record.id,
168-
digest,
169-
Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
170-
|| ClientError::ContentNotFound {
171-
digest: digest.clone(),
172-
},
173-
)?),
174-
)
175-
.await
176-
.map_err(|e| match e {
177-
api::ClientError::Package(PackageError::Rejection(reason)) => {
178-
ClientError::PublishRejected {
179-
id: package.id.clone(),
180-
record_id: record.id.clone(),
181-
reason,
182-
}
162+
// TODO: parallelize this
163+
for (digest, MissingContent { upload }) in record.missing_content() {
164+
// Upload the missing content, if the registry supports it
165+
let Some(UploadEndpoint::HttpPost {url}) = upload.first() else {
166+
continue;
167+
};
168+
169+
self.api
170+
.upload_content(
171+
url,
172+
Body::wrap_stream(self.content.load_content(digest).await?.ok_or_else(
173+
|| ClientError::ContentNotFound {
174+
digest: digest.clone(),
175+
},
176+
)?),
177+
)
178+
.await
179+
.map_err(|e| match e {
180+
api::ClientError::Package(PackageError::Rejection(reason)) => {
181+
ClientError::PublishRejected {
182+
id: package.id.clone(),
183+
record_id: record.id.clone(),
184+
reason,
183185
}
184-
_ => e.into(),
185-
})?;
186-
}
186+
}
187+
_ => e.into(),
188+
})?;
187189
}
188190

189191
Ok(record.id)

crates/server/openapi.yaml

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -700,12 +700,9 @@ components:
700700
enum: [sourcing]
701701
example: sourcing
702702
missingContent:
703-
type: array
704-
description: The array of content digests that are missing for the package record.
705-
minItems: 1
706-
maxItems: 128
707-
items:
708-
"$ref": "#/components/schemas/AnyHash"
703+
"$ref": "#/components/schemas/MissingContentMap"
704+
description: The missing content for the package record.
705+
minProperties: 1
709706
ProcessingRecord:
710707
type: object
711708
description: A record that is being processed.
@@ -825,6 +822,44 @@ components:
825822
description: The algorithm-prefixed bytes of the signature (base64 encoded).
826823
pattern: ^[a-z0-9-]+:(?:[A-Za-z0-9+\/]{4})*(?:[A-Za-z0-9+\/]{4}|[A-Za-z0-9+\/]{3}=|[A-Za-z0-9+\/]{2}={2})$
827824
example: "ecdsa-p256:MEUCIQCzWZBW6ux9LecP66Y+hjmLZTP/hZVz7puzlPTXcRT2wwIgQZO7nxP0nugtw18MwHZ26ROFWcJmgCtKOguK031Y1D0="
825+
MissingContentMap:
826+
type: object
827+
description: The map of content digest to missing content info.
828+
patternProperties:
829+
"^[a-z0-9-]+:[a-f0-9]+$":
830+
"$ref": "#/components/schemas/MissingContent"
831+
example:
832+
? "sha256:7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded9773"
833+
: upload:
834+
- type: httpPost
835+
url: https://example.com/7d865e959b2466918c9863afca942d0fb89d7c9ac0c99bafc3749504ded9773
836+
MissingContent:
837+
description: Information about missing content.
838+
properties:
839+
upload:
840+
description: Upload endpoint(s) for the missing content.
841+
type: array
842+
items:
843+
oneOf:
844+
- "$ref": "#/components/schemas/HttpPostUpload"
845+
discriminator:
846+
propertyName: type
847+
mapping:
848+
httpPost: "#/components/schemas/HttpPostUpload"
849+
HttpPostUpload:
850+
type: object
851+
description: A HTTP POST upload endpoint.
852+
properties:
853+
type:
854+
type: string
855+
description: The type of upload endpoint.
856+
enum: [httpPost]
857+
example: httpPost
858+
url:
859+
type: string
860+
description: The URL of the upload endpoint, which may be relative to the API base URL.
861+
example: https://example.com/contents.wasm
862+
format: uri
828863
ContentSourceMap:
829864
type: object
830865
description: The map of content digest to sources.

crates/server/src/api/v1/package.rs

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ use axum::{
1616
Router,
1717
};
1818
use futures::StreamExt;
19-
use std::path::PathBuf;
2019
use std::sync::Arc;
20+
use std::{collections::HashMap, path::PathBuf};
2121
use tempfile::NamedTempFile;
2222
use tokio::io::AsyncWriteExt;
2323
use url::Url;
2424
use warg_api::v1::package::{
25-
ContentSource, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
25+
ContentSource, MissingContent, PackageError, PackageRecord, PackageRecordState,
26+
PublishRecordRequest, UploadEndpoint,
2627
};
2728
use warg_crypto::hash::{AnyHash, Sha256};
2829
use warg_protocol::{
@@ -91,6 +92,26 @@ impl Config {
9192
.unwrap()
9293
.to_string()
9394
}
95+
96+
fn build_missing_content<'a>(
97+
&self,
98+
log_id: &LogId,
99+
record_id: &RecordId,
100+
missing_digests: impl IntoIterator<Item = &'a AnyHash>,
101+
) -> HashMap<AnyHash, MissingContent> {
102+
missing_digests
103+
.into_iter()
104+
.map(|digest| {
105+
let url = format!("v1/package/{log_id}/record/{record_id}/content/{digest}");
106+
(
107+
digest.clone(),
108+
MissingContent {
109+
upload: vec![UploadEndpoint::HttpPost { url }],
110+
},
111+
)
112+
})
113+
.collect()
114+
}
94115
}
95116

96117
struct PackageApiError(PackageError);
@@ -231,13 +252,12 @@ async fn publish_record(
231252
));
232253
}
233254

255+
let missing_content = config.build_missing_content(&log_id, &record_id, missing);
234256
Ok((
235257
StatusCode::ACCEPTED,
236258
Json(PackageRecord {
237259
id: record_id,
238-
state: PackageRecordState::Sourcing {
239-
missing_content: missing.into_iter().cloned().collect(),
240-
},
260+
state: PackageRecordState::Sourcing { missing_content },
241261
}),
242262
))
243263
}
@@ -254,12 +274,13 @@ async fn get_record(
254274
.await?;
255275

256276
match record.status {
257-
RecordStatus::MissingContent(missing) => Ok(Json(PackageRecord {
258-
id: record_id,
259-
state: PackageRecordState::Sourcing {
260-
missing_content: missing,
261-
},
262-
})),
277+
RecordStatus::MissingContent(missing) => {
278+
let missing_content = config.build_missing_content(&log_id, &record_id, &missing);
279+
Ok(Json(PackageRecord {
280+
id: record_id,
281+
state: PackageRecordState::Sourcing { missing_content },
282+
}))
283+
}
263284
// Validated is considered still processing until included in a checkpoint
264285
RecordStatus::Pending | RecordStatus::Validated => Ok(Json(PackageRecord {
265286
id: record_id,
@@ -275,11 +296,11 @@ async fn get_record(
275296
.as_ref()
276297
.contents()
277298
.into_iter()
278-
.map(|d| {
299+
.map(|digest| {
279300
(
280-
d.clone(),
301+
digest.clone(),
281302
vec![ContentSource::Http {
282-
url: config.content_url(d),
303+
url: config.content_url(digest),
283304
}],
284305
)
285306
})
@@ -363,7 +384,7 @@ async fn upload_content(
363384
{
364385
config
365386
.core_service
366-
.submit_package_record(log_id, record_id.clone())
387+
.submit_package_record(log_id, record_id)
367388
.await;
368389
}
369390

0 commit comments

Comments
 (0)