Skip to content

Commit b779f5a

Browse files
authored
Refactor TaskContext to hold and use a Durable client (#62)
* Refactor TaskContext to hold and use a Durable client We now delegate to the existing 'spawn_by_name' (wrapping it in a checkpoint in TaskContext). This lets us re-use all of the existing logic, including the OpenTelemetry context propagation logic. This will give us a tree structure in OpenTelemetry - subtasks will use their parent task as the parent trace * Run clippy * Fix telemetry * Set parent_task_id * Fix review comments * Mark owns_pool as false when client is cloesd
1 parent 3a06598 commit b779f5a

File tree

15 files changed

+318
-392
lines changed

15 files changed

+318
-392
lines changed

src/client.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde_json::Value as JsonValue;
33
use sqlx::{Executor, PgPool, Postgres};
44
use std::collections::HashMap;
55
use std::sync::Arc;
6+
use std::sync::atomic::{AtomicBool, Ordering};
67
use std::time::Duration;
78
use tokio::sync::RwLock;
89
use uuid::Uuid;
@@ -24,6 +25,8 @@ struct SpawnOptionsDb<'a> {
2425
retry_strategy: Option<&'a RetryStrategy>,
2526
#[serde(skip_serializing_if = "Option::is_none")]
2627
cancellation: Option<CancellationPolicyDb>,
28+
#[serde(skip_serializing_if = "Option::is_none")]
29+
parent_task_id: Option<&'a Uuid>,
2730
}
2831

2932
/// Internal struct for serializing cancellation policy (only non-None fields).
@@ -108,13 +111,42 @@ where
108111
State: Clone + Send + Sync + 'static,
109112
{
110113
pool: PgPool,
111-
owns_pool: bool,
114+
owns_pool: AtomicBool,
112115
queue_name: String,
113116
spawn_defaults: SpawnDefaults,
114117
registry: Arc<RwLock<TaskRegistry<State>>>,
115118
state: State,
116119
}
117120

121+
impl<State: Clone + Send + Sync> Durable<State> {
122+
/// TODO: Decide if we want to implement `Clone`,
123+
/// which will allow consumers to clone `Durable`
124+
/// Currently, we only allow cloning with in the crate
125+
/// via this method
126+
pub(crate) fn clone_inner(&self) -> Durable<State> {
127+
// When we clone a durable client, mark *ourself* as no longer owning the pool
128+
// This will cause `Durable.close()` to be a no-op, since something else could
129+
// still be using the pool.
130+
// sqlx itself will still close the pool when the last reference to it is dropped.
131+
// At the moment, we only call `clone_inner` when spawning a worker, which has its own
132+
// `shutdown()` method.
133+
self.owns_pool.store(false, Ordering::Relaxed);
134+
Durable {
135+
pool: self.pool.clone(),
136+
// A clone of a durable client never owns the pool, so we set this to false
137+
owns_pool: AtomicBool::new(false),
138+
queue_name: self.queue_name.clone(),
139+
spawn_defaults: self.spawn_defaults.clone(),
140+
registry: self.registry.clone(),
141+
state: self.state.clone(),
142+
}
143+
}
144+
145+
pub(crate) fn registry(&self) -> &Arc<RwLock<TaskRegistry<State>>> {
146+
&self.registry
147+
}
148+
}
149+
118150
/// Builder for configuring a [`Durable`] client.
119151
///
120152
/// # Example
@@ -252,7 +284,7 @@ impl DurableBuilder {
252284

253285
Ok(Durable {
254286
pool,
255-
owns_pool,
287+
owns_pool: AtomicBool::new(owns_pool),
256288
queue_name: self.queue_name,
257289
spawn_defaults: self.spawn_defaults,
258290
registry: Arc::new(RwLock::new(HashMap::new())),
@@ -557,6 +589,7 @@ where
557589
.cancellation
558590
.as_ref()
559591
.and_then(CancellationPolicyDb::from_policy),
592+
parent_task_id: options.parent_task_id.as_ref(),
560593
};
561594
serde_json::to_value(db_options)
562595
}
@@ -692,20 +725,12 @@ where
692725
});
693726
}
694727

695-
Ok(Worker::start(
696-
self.pool.clone(),
697-
self.queue_name.clone(),
698-
self.registry.clone(),
699-
options,
700-
self.state.clone(),
701-
self.spawn_defaults.clone(),
702-
)
703-
.await)
728+
Ok(Worker::start(self.clone_inner(), options).await)
704729
}
705730

706731
/// Close the client. Closes the pool if owned.
707732
pub async fn close(self) {
708-
if self.owns_pool {
733+
if self.owns_pool.load(Ordering::Relaxed) {
709734
self.pool.close().await;
710735
}
711736
}

0 commit comments

Comments
 (0)