Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 42 additions & 146 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Instant, SystemTime};

use super::messages::{OneOffQueryResponseMessage, ProcedureResultMessage};
use super::messages::OneOffQueryResponseMessage;
use super::{message_handlers, ClientActorId, MessageHandleError, OutboundMessage};
use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
use crate::host::module_host::ClientConnectedError;
use crate::host::{
CallProcedureReturn, FunctionArgs, ModuleHost, NoSuchModule, ProcedureCallResult, ReducerCallError,
ReducerCallResult,
};
use crate::host::{FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError};
use crate::subscription::module_subscription_manager::BroadcastError;
use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool;
use crate::util::asyncify;
use crate::util::prometheus_handle::IntGaugeExt;
use crate::worker_metrics::WORKER_METRICS;
use bytes::Bytes;
Expand All @@ -30,8 +26,7 @@ use spacetimedb_auth::identity::{ConnectionAuthCtx, SpacetimeIdentityClaims};
use spacetimedb_client_api_messages::websocket::{common as ws_common, v1 as ws_v1, v2 as ws_v2};
use spacetimedb_durability::{DurableOffset, TxOffset};
use spacetimedb_lib::identity::{AuthCtx, RequestId};
use spacetimedb_lib::metrics::ExecutionMetrics;
use spacetimedb_lib::{bsatn, Identity, TimeDuration, Timestamp};
use spacetimedb_lib::Identity;
use tokio::sync::mpsc::error::{SendError, TrySendError};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;
Expand Down Expand Up @@ -841,7 +836,7 @@ impl ClientConnection {
request_id: RequestId,
timer: Instant,
flags: ws_v1::CallReducerFlags,
) -> Result<ReducerCallResult, ReducerCallError> {
) -> Result<(), ReducerCallError> {
let caller = match flags {
ws_v1::CallReducerFlags::FullUpdate => Some(self.sender()),
// Setting `sender = None` causes `eval_updates` to skip sending to the caller
Expand All @@ -850,7 +845,7 @@ impl ClientConnection {
};

self.module()
.call_reducer(
.call_reducer_ws(
self.id.identity,
Some(self.id.connection_id),
caller,
Expand All @@ -869,9 +864,9 @@ impl ClientConnection {
request_id: RequestId,
timer: Instant,
_flags: ws_v2::CallReducerFlags,
) -> Result<ReducerCallResult, ReducerCallError> {
) -> Result<(), ReducerCallError> {
self.module()
.call_reducer(
.call_reducer_ws(
self.id.identity,
Some(self.id.connection_id),
Some(self.sender()),
Expand All @@ -890,22 +885,17 @@ impl ClientConnection {
request_id: RequestId,
timer: Instant,
) -> Result<(), BroadcastError> {
let CallProcedureReturn { result, tx_offset } = self
.module()
.call_procedure(
self.module()
.call_procedure_ws(
self.sender(),
request_id,
self.id.identity,
Some(self.id.connection_id),
Some(timer),
procedure,
args,
)
.await;

let message = ProcedureResultMessage::from_result(&result, request_id);

self.module()
.subscriptions()
.send_procedure_message(self.sender(), message, tx_offset)
.await
}

pub async fn call_procedure_v2(
Expand All @@ -916,152 +906,58 @@ impl ClientConnection {
timer: Instant,
_flags: ws_v2::CallProcedureFlags,
) -> Result<(), BroadcastError> {
let CallProcedureReturn { result, tx_offset } = self
.module()
.call_procedure(
self.module()
.call_procedure_ws_v2(
self.sender(),
request_id,
self.id.identity,
Some(self.id.connection_id),
Some(timer),
procedure,
FunctionArgs::Bsatn(args),
)
.await;

let (status, timestamp, execution_duration) = match result {
Ok(ProcedureCallResult {
return_val,
execution_duration,
start_timestamp,
}) => (
ws_v2::ProcedureStatus::Returned(
bsatn::to_vec(&return_val)
.expect("Procedure return value failed to serialize to BSATN")
.into(),
),
start_timestamp,
TimeDuration::from(execution_duration),
),
Err(err) => (
ws_v2::ProcedureStatus::InternalError(err.to_string().into()),
Timestamp::UNIX_EPOCH,
TimeDuration::ZERO,
),
};

let message = ws_v2::ProcedureResult {
status,
timestamp,
total_host_execution_duration: execution_duration,
request_id,
};

self.module()
.subscriptions()
.send_procedure_message_v2(self.sender(), message, tx_offset)
.await
}

pub async fn subscribe_single(
&self,
subscription: ws_v1::SubscribeSingle,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
pub async fn subscribe_single(&self, subscription: ws_v1::SubscribeSingle, timer: Instant) -> Result<(), DBError> {
self.module()
.on_module_thread_async("subscribe_single", async move || {
let host = me.module();
host.subscriptions()
.add_single_subscription(Some(&host), me.sender, me.auth.clone(), subscription, timer, None)
.await
})
.await?
.call_view_add_single_subscription_ws(self.sender(), self.auth.clone(), subscription, timer)
.await
}

pub async fn unsubscribe(
&self,
request: ws_v1::Unsubscribe,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
asyncify(move || {
me.module()
.subscriptions()
.remove_single_subscription(me.sender, me.auth.clone(), request, timer)
})
.await
pub async fn unsubscribe(&self, request: ws_v1::Unsubscribe, timer: Instant) -> Result<(), DBError> {
self.module()
.call_view_remove_single_subscription_ws(self.sender(), self.auth.clone(), request, timer)
.await
}

pub async fn subscribe_v2(
&self,
request: ws_v2::Subscribe,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
pub async fn subscribe_v2(&self, request: ws_v2::Subscribe, timer: Instant) -> Result<(), DBError> {
self.module()
.on_module_thread_async("subscribe_v2", async move || {
let host = me.module();
host.subscriptions()
.add_v2_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None)
.await
})
.await?
.call_view_add_v2_subscription_ws(self.sender(), self.auth.clone(), request, timer)
.await
}
pub async fn subscribe_multi(
&self,
request: ws_v1::SubscribeMulti,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
pub async fn subscribe_multi(&self, request: ws_v1::SubscribeMulti, timer: Instant) -> Result<(), DBError> {
self.module()
.on_module_thread_async("subscribe_multi", async move || {
let host = me.module();
host.subscriptions()
.add_multi_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None)
.await
})
.await?
.call_view_add_multi_subscription_ws(self.sender(), self.auth.clone(), request, timer)
.await
}

pub async fn unsubscribe_multi(
&self,
request: ws_v1::UnsubscribeMulti,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
pub async fn unsubscribe_multi(&self, request: ws_v1::UnsubscribeMulti, timer: Instant) -> Result<(), DBError> {
self.module()
.on_module_thread("unsubscribe_multi", move || {
me.module()
.subscriptions()
.remove_multi_subscription(me.sender, me.auth.clone(), request, timer)
})
.await?
.call_view_remove_multi_subscription_ws(self.sender(), self.auth.clone(), request, timer)
.await
}

pub async fn unsubscribe_v2(
&self,
request: ws_v2::Unsubscribe,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
pub async fn unsubscribe_v2(&self, request: ws_v2::Unsubscribe, timer: Instant) -> Result<(), DBError> {
self.module()
.on_module_thread_async("unsubscribe_v2", async move || {
let host = me.module();
host.subscriptions()
.remove_v2_subscription(Some(&host), me.sender, me.auth.clone(), request, timer)
.await
})
.await?
.call_view_remove_v2_subscription_ws(self.sender(), self.auth.clone(), request, timer)
.await
}

pub async fn subscribe(&self, subscription: ws_v1::Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
let me = self.clone();
pub async fn subscribe(&self, subscription: ws_v1::Subscribe, timer: Instant) -> Result<(), DBError> {
self.module()
.on_module_thread_async("subscribe", async move || {
let host = me.module();
host.subscriptions()
.add_legacy_subscriber(Some(&host), me.sender, me.auth.clone(), subscription, timer, None)
.await
})
.await?
.call_view_add_legacy_subscription_ws(self.sender(), self.auth.clone(), subscription, timer)
.await
}

pub async fn one_off_query_json(
Expand All @@ -1071,7 +967,7 @@ impl ClientConnection {
timer: Instant,
) -> Result<(), anyhow::Error> {
self.module()
.one_off_query::<ws_v1::JsonFormat>(
.one_off_query_ws::<ws_v1::JsonFormat>(
self.auth.clone(),
query.to_owned(),
self.sender.clone(),
Expand All @@ -1091,7 +987,7 @@ impl ClientConnection {
) -> Result<(), anyhow::Error> {
let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone();
self.module()
.one_off_query::<ws_v1::BsatnFormat>(
.one_off_query_ws::<ws_v1::BsatnFormat>(
self.auth.clone(),
query.to_owned(),
self.sender.clone(),
Expand All @@ -1106,7 +1002,7 @@ impl ClientConnection {
pub async fn one_off_query_v2(&self, query: &str, request_id: u32, timer: Instant) -> Result<(), anyhow::Error> {
let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone();
self.module()
.one_off_query_v2(
.one_off_query_v2_ws(
self.auth.clone(),
query.to_owned(),
self.sender.clone(),
Expand Down
42 changes: 17 additions & 25 deletions crates/core/src/client/message_handlers_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
let mod_info = module.info();
let mod_metrics = &mod_info.metrics;
let database_identity = mod_info.database_identity;
let db = module.relational_db();
let record_metrics = |wl| {
move |metrics| {
if let Some(metrics) = metrics {
db.exec_counters_for(wl).record(&metrics);
}
}
};
let sub_metrics = record_metrics(WorkloadType::Subscribe);
let unsub_metrics = record_metrics(WorkloadType::Unsubscribe);

type HandleResult<'a> = Result<(), (Option<&'a RawIdentifier>, Option<ReducerId>, anyhow::Error)>;
let res: HandleResult<'_> = match message {
ws_v1::ClientMessage::CallReducer(ws_v1::CallReducer {
Expand All @@ -59,49 +48,52 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
request_id,
flags,
}) => {
let res = client.call_reducer(reducer, args, request_id, timer, flags).await;
let res = client
.call_reducer(reducer, args, request_id, timer, flags)
.await
.map_err(|e| {
(
Some(reducer),
mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id),
e.into(),
)
});
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Reducer, &database_identity, reducer)
.observe(timer.elapsed().as_secs_f64());
res.map(drop).map_err(|e| {
(
Some(reducer),
mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id),
e.into(),
)
})
res
}
ws_v1::ClientMessage::SubscribeMulti(subscription) => {
let res = client.subscribe_multi(subscription, timer).await.map(sub_metrics);
let res = client.subscribe_multi(subscription, timer).await;
mod_metrics
.request_round_trip_subscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ws_v1::ClientMessage::UnsubscribeMulti(request) => {
let res = client.unsubscribe_multi(request, timer).await.map(unsub_metrics);
let res = client.unsubscribe_multi(request, timer).await;
mod_metrics
.request_round_trip_unsubscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ws_v1::ClientMessage::SubscribeSingle(subscription) => {
let res = client.subscribe_single(subscription, timer).await.map(sub_metrics);
let res = client.subscribe_single(subscription, timer).await;
mod_metrics
.request_round_trip_subscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ws_v1::ClientMessage::Unsubscribe(request) => {
let res = client.unsubscribe(request, timer).await.map(unsub_metrics);
let res = client.unsubscribe(request, timer).await;
mod_metrics
.request_round_trip_unsubscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ws_v1::ClientMessage::Subscribe(subscription) => {
let res = client.subscribe(subscription, timer).await.map(Some).map(sub_metrics);
let res = client.subscribe(subscription, timer).await;
mod_metrics
.request_round_trip_subscribe
.observe(timer.elapsed().as_secs_f64());
Expand Down Expand Up @@ -134,7 +126,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
if let Err(e) = res {
log::warn!("Procedure call failed: {e:#}");
}
// `ClientConnection::call_procedure` handles sending the error message to the client if the call fails,
// `ClientConnection::call_procedure` handles sending the procedure result message itself,
// so we don't need to return an `Err` here.
Ok(())
}
Expand Down
Loading
Loading