Skip to content

Commit 66502c3

Browse files
fix: shutdown called on providers
Signed-off-by: Fabrizio Demaria <fabrizio.f.demaria@gmail.com>
1 parent 7f74461 commit 66502c3

3 files changed

Lines changed: 110 additions & 11 deletions

File tree

src/api/api.rs

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,33 @@ mod tests {
199199
number = "1.1.2.3",
200200
text = "The provider mutator function MUST invoke the shutdown function on the previously registered provider once it's no longer being used to resolve flag values."
201201
)]
202-
#[test]
203-
fn invoke_shutdown_on_old_provider_checked_by_type_system() {}
202+
#[tokio::test]
203+
async fn invoke_shutdown_on_old_provider() {
204+
use std::sync::Arc;
205+
use tokio::sync::Notify;
206+
use tokio::time::{timeout, Duration};
207+
208+
let shutdown_notify = Arc::new(Notify::new());
209+
let shutdown_notify_for_provider = shutdown_notify.clone();
210+
211+
let mut old_provider = MockFeatureProvider::new();
212+
old_provider.expect_initialize().returning(|_| {});
213+
old_provider
214+
.expect_shutdown()
215+
.returning(move || shutdown_notify_for_provider.notify_one())
216+
.once();
217+
218+
let mut new_provider = MockFeatureProvider::new();
219+
new_provider.expect_initialize().returning(|_| {});
220+
221+
let mut api = OpenFeature::default();
222+
api.set_provider(old_provider).await;
223+
api.set_provider(new_provider).await;
224+
225+
timeout(Duration::from_millis(200), shutdown_notify.notified())
226+
.await
227+
.expect("previous provider shutdown not invoked");
228+
}
204229

205230
#[spec(
206231
number = "1.1.3",
@@ -321,10 +346,28 @@ mod tests {
321346
)]
322347
#[tokio::test]
323348
async fn shutdown() {
349+
use std::sync::Arc;
350+
use tokio::sync::Notify;
351+
use tokio::time::{timeout, Duration};
352+
353+
let shutdown_notify = Arc::new(Notify::new());
354+
let shutdown_notify_for_provider = shutdown_notify.clone();
355+
324356
let mut api = OpenFeature::default();
325-
api.set_provider(NoOpProvider::default()).await;
357+
let mut provider = MockFeatureProvider::new();
358+
provider.expect_initialize().returning(|_| {});
359+
provider
360+
.expect_shutdown()
361+
.returning(move || shutdown_notify_for_provider.notify_one())
362+
.once();
363+
364+
api.set_provider(provider).await;
326365

327366
api.shutdown().await;
367+
368+
timeout(Duration::from_millis(200), shutdown_notify.notified())
369+
.await
370+
.expect("shutdown did not invoke provider shutdown");
328371
}
329372

330373
#[spec(

src/api/provider_registry.rs

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
use std::sync::atomic::{AtomicBool, Ordering};
12
use std::sync::Arc;
23
use std::{borrow::Borrow, collections::HashMap};
34

45
use tokio::sync::RwLock;
6+
use tokio::task::JoinHandle;
57

68
use crate::provider::{FeatureProvider, NoOpProvider};
79

@@ -32,20 +34,26 @@ impl ProviderRegistry {
3234
}
3335

3436
pub async fn set_default<T: FeatureProvider>(&self, mut provider: T) {
35-
let mut map = self.providers.write().await;
36-
map.remove("");
37+
let old_provider = self.providers.write().await.remove("");
38+
39+
if let Some(old_provider) = old_provider {
40+
old_provider.shutdown_in_background();
41+
}
3742

3843
provider
3944
.initialize(self.global_evaluation_context.get().await.borrow())
4045
.await;
4146

42-
map.insert(String::default(), FeatureProviderWrapper::new(provider));
47+
self.providers
48+
.write()
49+
.await
50+
.insert(String::default(), FeatureProviderWrapper::new(provider));
4351
}
4452

4553
pub async fn set_named<T: FeatureProvider>(&self, name: &str, mut provider: T) {
4654
// Drop the already registered provider if any.
47-
if self.get_named(name).await.is_some() {
48-
self.providers.write().await.remove(name);
55+
if let Some(old_provider) = self.providers.write().await.remove(name) {
56+
old_provider.shutdown_in_background();
4957
}
5058

5159
provider
@@ -74,7 +82,21 @@ impl ProviderRegistry {
7482
}
7583

7684
pub async fn clear(&self) {
85+
let providers: Vec<FeatureProviderWrapper> =
86+
self.providers.read().await.values().cloned().collect();
87+
88+
let mut shutdown_handles = Vec::with_capacity(providers.len());
89+
for provider in providers {
90+
if let Some(handle) = provider.shutdown_in_background() {
91+
shutdown_handles.push(handle);
92+
}
93+
}
94+
7795
self.providers.write().await.clear();
96+
97+
for handle in shutdown_handles {
98+
let _ = handle.await;
99+
}
78100
}
79101
}
80102

@@ -89,14 +111,44 @@ impl Default for ProviderRegistry {
89111
// ============================================================
90112

91113
#[derive(Clone)]
92-
pub struct FeatureProviderWrapper(Arc<dyn FeatureProvider>);
114+
pub struct FeatureProviderWrapper(Arc<ProviderEntry>);
93115

94116
impl FeatureProviderWrapper {
95117
pub fn new(provider: impl FeatureProvider) -> Self {
96-
Self(Arc::new(provider))
118+
Self(Arc::new(ProviderEntry::new(provider)))
97119
}
98120

99121
pub fn get(&self) -> Arc<dyn FeatureProvider> {
100-
self.0.clone()
122+
self.0.provider.clone()
123+
}
124+
125+
pub fn shutdown_in_background(&self) -> Option<JoinHandle<()>> {
126+
if self
127+
.0
128+
.shutdown_started
129+
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
130+
.is_ok()
131+
{
132+
let provider = self.get();
133+
Some(tokio::spawn(async move {
134+
provider.shutdown().await;
135+
}))
136+
} else {
137+
None
138+
}
139+
}
140+
}
141+
142+
struct ProviderEntry {
143+
provider: Arc<dyn FeatureProvider>,
144+
shutdown_started: AtomicBool,
145+
}
146+
147+
impl ProviderEntry {
148+
fn new(provider: impl FeatureProvider) -> Self {
149+
Self {
150+
provider: Arc::new(provider),
151+
shutdown_started: AtomicBool::new(false),
152+
}
101153
}
102154
}

src/provider/feature_provider.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ pub trait FeatureProvider: Send + Sync + 'static {
4545
ProviderStatus::Ready
4646
}
4747

48+
/// The provider MAY define a mechanism to gracefully shutdown and dispose of resources.
49+
#[allow(unused_variables)]
50+
async fn shutdown(&self) {}
51+
4852
/// The provider interface MUST define a metadata member or accessor, containing a name field
4953
/// or accessor of type string, which identifies the provider implementation.
5054
fn metadata(&self) -> &ProviderMetadata;

0 commit comments

Comments
 (0)