Skip to content
18 changes: 18 additions & 0 deletions relay-server/src/services/projects/cache/handle.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
Expand Down Expand Up @@ -32,6 +33,23 @@ impl ProjectCacheHandle {
Project::new(project, &self.config)
}

/// Awaits until the given project state becomes ready (enabled or disabled).
///
/// Returns an empty [`Err`] if the project config cannot be resolved in the given time.
pub async fn get_ready(
&self,
project_key: ProjectKey,
timeout: Duration,
) -> Result<Project<'_>, ()> {
Comment thread
jjbayer marked this conversation as resolved.
Outdated
// Always trigger a fetch after retrieving the project to make sure the state is up to date.
self.fetch(project_key);
Comment thread
jjbayer marked this conversation as resolved.

Ok(Project::new(
self.shared.get_ready(project_key, timeout).await?,
&self.config,
))
}

/// Triggers a fetch/update check in the project cache for the supplied project.
pub fn fetch(&self, project_key: ProjectKey) {
self.service.send(ProjectCache::Fetch(project_key));
Expand Down
72 changes: 69 additions & 3 deletions relay-server/src/services/projects/cache/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::StreamExt;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio::time::Instant;

use arc_swap::ArcSwap;
Expand Down Expand Up @@ -298,16 +299,32 @@ impl Shared {
/// The caller must ensure that the project cache is instructed to
/// [`super::ProjectCache::Fetch`] the retrieved project.
pub fn get_or_create(&self, project_key: ProjectKey) -> SharedProject {
self.get_or_create_inner(project_key).to_shared_project()
}

/// Awaits until the contained project state becomes ready (enabled or disabled).
///
/// Returns an empty [`Err`] if the project config cannot be resolved in the given time.
pub async fn get_ready(
&self,
project_key: ProjectKey,
timeout: Duration,
) -> Result<SharedProject, ()> {
let shared = self.get_or_create_inner(project_key);
shared.ready_project(timeout).await
}

Comment thread
jjbayer marked this conversation as resolved.
Outdated
fn get_or_create_inner(&self, project_key: ProjectKey) -> SharedProjectState {
// The fast path, we expect the project to exist.
let projects = self.projects.pin();
if let Some(project) = projects.get(&project_key) {
return project.to_shared_project();
return project.clone();
}

// The slow path, try to attempt to insert, somebody else may have been faster, but that's okay.
match projects.try_insert(project_key, Default::default()) {
Ok(inserted) => inserted.to_shared_project(),
Err(occupied) => occupied.current.to_shared_project(),
Ok(inserted) => inserted.clone(),
Err(occupied) => occupied.current.clone(),
}
}
}
Expand Down Expand Up @@ -603,6 +620,7 @@ impl SharedProjectState {
state: state.clone(),
rate_limits: Arc::clone(&stored.rate_limits),
reservoir_counters: Arc::clone(&stored.reservoir_counters),
notify: Arc::clone(&stored.notify),
});

// Try clean expired reservoir counters.
Expand All @@ -620,6 +638,30 @@ impl SharedProjectState {
}
}
}

// Finally, notify listeners:
prev.notify.notify_waiters();
}

/// Awaits until the contained project state becomes ready (enabled or disabled).
///
/// Returns an empty [`Err`] if the project config cannot be resolved in the given time.
pub async fn ready_project(&self, timeout: Duration) -> Result<SharedProject, ()> {
tokio::time::timeout(timeout, self.ready_project_inner())
Comment thread
jjbayer marked this conversation as resolved.
Outdated
.await
.map_err(|_| ())
}

async fn ready_project_inner(&self) -> SharedProject {
loop {
let inner = self.0.load_full();
// Register the listener _before_ checking the state to prevent a race.
let notified = inner.notify.notified();
if !inner.state.is_pending() {
return SharedProject(Arc::clone(&inner));
}
notified.await;
}
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
}

/// Extracts and clones the revision from the contained project state.
Expand All @@ -642,6 +684,7 @@ struct SharedProjectStateInner {
state: ProjectState,
rate_limits: Arc<CachedRateLimits>,
reservoir_counters: ReservoirCounters,
notify: Arc<Notify>,
}

/// Current fetch state for a project.
Expand Down Expand Up @@ -1317,4 +1360,27 @@ mod tests {

assert!(store.refresh(refresh).is_none());
}

#[tokio::test(start_paused = true)]
async fn test_ready_state() {
let shared = SharedProjectState::default();
let shared1 = shared.clone();

#[allow(clippy::disallowed_methods)]
Comment thread
jjbayer marked this conversation as resolved.
Outdated
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
shared1.set_project_state(ProjectState::Disabled);
});

// After five seconds, project state is still pending:
let result = shared.ready_project(Duration::from_secs(5)).await;
assert!(result.is_err());

// After another 10 seconds, the state will have been updated:
let result = shared.ready_project(Duration::from_secs(10)).await;
assert!(matches!(
result.unwrap().project_state(),
&ProjectState::Disabled
));
}
}
Loading