Skip to content

Commit 66c0a06

Browse files
committed
Production readiness: bounded channels, health probes, security tests, fuzz fixes
1 parent f7937cb commit 66c0a06

6 files changed

Lines changed: 147 additions & 19 deletions

File tree

crates/runtime/src/api/server.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ use crate::types::RuntimeError;
8282
super::routes::get_channel_audit,
8383
super::routes::agent_heartbeat,
8484
super::routes::agent_push_event,
85-
health_check
85+
health_check,
86+
liveness_check,
87+
readiness_check
8688
),
8789
components(
8890
schemas(
@@ -305,6 +307,8 @@ impl HttpApiServer {
305307

306308
let mut router = Router::new()
307309
.route("/api/v1/health", get(health_check))
310+
.route("/api/v1/health/live", get(liveness_check))
311+
.route("/api/v1/health/ready", get(readiness_check))
308312
.with_state(self.start_time);
309313

310314
// Add Swagger UI only in non-production environments
@@ -503,3 +507,44 @@ async fn health_check(
503507

504508
Ok(Json(response))
505509
}
510+
511+
/// Liveness probe — confirms the process is running and able to serve requests.
512+
#[cfg(feature = "http-api")]
513+
#[utoipa::path(
514+
get,
515+
path = "/api/v1/health/live",
516+
responses(
517+
(status = 200, description = "Process is alive"),
518+
(status = 503, description = "Process is not alive")
519+
),
520+
tag = "system"
521+
)]
522+
async fn liveness_check() -> StatusCode {
523+
// Liveness: the process is running and can serve requests.
524+
// No dependency checks — if this handler runs, we're alive.
525+
StatusCode::OK
526+
}
527+
528+
/// Readiness probe — confirms the system has completed initialization.
529+
#[cfg(feature = "http-api")]
530+
#[utoipa::path(
531+
get,
532+
path = "/api/v1/health/ready",
533+
responses(
534+
(status = 200, description = "System is ready to accept work"),
535+
(status = 503, description = "System is not ready")
536+
),
537+
tag = "system"
538+
)]
539+
async fn readiness_check(
540+
axum::extract::State(start_time): axum::extract::State<Instant>,
541+
) -> StatusCode {
542+
// Readiness: the system has completed initialization and can accept work.
543+
// After 5 seconds of uptime, consider the system ready.
544+
// In the future, check scheduler state, DB connections, etc.
545+
if start_time.elapsed().as_secs() >= 5 {
546+
StatusCode::OK
547+
} else {
548+
StatusCode::SERVICE_UNAVAILABLE
549+
}
550+
}

crates/runtime/src/integrations/composio/transport.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,19 @@ impl SseTransport {
2626
/// Returns an error if the API key contains invalid header characters
2727
/// or if the HTTP client fails to build.
2828
pub fn new(endpoint_url: String, api_key: String) -> Result<Self, ComposioError> {
29-
let header_value = HeaderValue::from_str(&api_key).map_err(|e| {
30-
ComposioError::Configuration(format!(
31-
"invalid API key (contains non-ASCII or control characters): {e}"
32-
))
33-
})?;
29+
let header_value =
30+
HeaderValue::from_str(&api_key).map_err(|e| ComposioError::ConfigError {
31+
reason: format!("invalid API key (contains non-ASCII or control characters): {e}"),
32+
})?;
3433

3534
let mut headers = HeaderMap::new();
3635
headers.insert("x-api-key", header_value);
3736

3837
let client = reqwest::Client::builder()
3938
.default_headers(headers)
4039
.build()
41-
.map_err(|e| {
42-
ComposioError::Configuration(format!("failed to build HTTP client: {e}"))
40+
.map_err(|e| ComposioError::ConfigError {
41+
reason: format!("failed to build HTTP client: {e}"),
4342
})?;
4443

4544
Ok(Self {

crates/runtime/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ pub mod api;
3434

3535
#[cfg(feature = "http-api")]
3636
use api::traits::RuntimeApiProvider;
37+
#[cfg(all(feature = "http-api", feature = "cron"))]
38+
use api::types::ScheduleRunEntry;
3739
#[cfg(feature = "http-api")]
3840
use api::types::{
3941
AddIdentityMappingRequest, AgentExecutionRecord, AgentStatusResponse, ChannelActionResponse,
@@ -42,8 +44,8 @@ use api::types::{
4244
DeleteChannelResponse, DeleteScheduleResponse, ExecuteAgentRequest, ExecuteAgentResponse,
4345
GetAgentHistoryResponse, IdentityMappingEntry, NextRunsResponse, RegisterChannelRequest,
4446
RegisterChannelResponse, ScheduleActionResponse, ScheduleDetail, ScheduleHistoryResponse,
45-
ScheduleRunEntry, ScheduleSummary, UpdateAgentRequest, UpdateAgentResponse,
46-
UpdateChannelRequest, UpdateScheduleRequest, WorkflowExecutionRequest,
47+
ScheduleSummary, UpdateAgentRequest, UpdateAgentResponse, UpdateChannelRequest,
48+
UpdateScheduleRequest, WorkflowExecutionRequest,
4749
};
4850
#[cfg(feature = "http-api")]
4951
use async_trait::async_trait;
@@ -365,6 +367,7 @@ pub struct RuntimeConfig {
365367
/// Implementation of RuntimeApiProvider for AgentRuntime
366368
#[cfg(feature = "http-api")]
367369
#[async_trait]
370+
#[allow(unused_variables)] // Params used inside #[cfg(feature = "cron")] blocks
368371
impl RuntimeApiProvider for AgentRuntime {
369372
async fn execute_workflow(
370373
&self,

crates/runtime/src/reasoning/tracing_spans.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ impl LoopTracer {
132132
);
133133
}
134134

135+
/// Get the trace ID for this loop (for external correlation).
136+
pub fn trace_id(&self) -> &str {
137+
&self.loop_id
138+
}
139+
135140
/// Get the loop ID.
136141
pub fn loop_id(&self) -> &str {
137142
&self.loop_id

crates/runtime/src/scheduler/task_manager.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::types::*;
1515
pub struct TaskManager {
1616
task_timeout: Duration,
1717
running_tasks: Arc<RwLock<HashMap<AgentId, TaskHandle>>>,
18-
task_sender: mpsc::UnboundedSender<TaskCommand>,
18+
task_sender: mpsc::Sender<TaskCommand>,
1919
#[allow(dead_code)]
2020
system_info: Arc<RwLock<System>>,
2121
}
@@ -24,7 +24,9 @@ impl TaskManager {
2424
/// Create a new task manager
2525
pub fn new(task_timeout: Duration) -> Self {
2626
let running_tasks = Arc::new(RwLock::new(HashMap::new()));
27-
let (task_sender, task_receiver) = mpsc::unbounded_channel();
27+
// Bounded channel prevents unbounded memory growth under load.
28+
// 1024 is sufficient for most workloads; submitters block when full.
29+
let (task_sender, task_receiver) = mpsc::channel(1024);
2830

2931
let manager = Self {
3032
task_timeout,
@@ -47,15 +49,16 @@ impl TaskManager {
4749
// Store the handle
4850
self.running_tasks.write().insert(agent_id, handle.clone());
4951

50-
// Send command to start the task
52+
// Send command to start the task (try_send avoids blocking the caller;
53+
// returns an error if the bounded channel is full).
5154
self.task_sender
52-
.send(TaskCommand::Start {
55+
.try_send(TaskCommand::Start {
5356
task: Box::new(task),
5457
handle,
5558
})
5659
.map_err(|_| SchedulerError::SchedulingFailed {
5760
agent_id,
58-
reason: "Failed to send start command".into(),
61+
reason: "Task channel full — backpressure applied".into(),
5962
})?;
6063

6164
Ok(())
@@ -67,12 +70,12 @@ impl TaskManager {
6770
let handle = self.running_tasks.write().remove(&agent_id);
6871

6972
if let Some(handle) = handle {
70-
// Send termination command
73+
// Send termination command (try_send to avoid blocking).
7174
self.task_sender
72-
.send(TaskCommand::Terminate { agent_id, handle })
75+
.try_send(TaskCommand::Terminate { agent_id, handle })
7376
.map_err(|_| SchedulerError::SchedulingFailed {
7477
agent_id,
75-
reason: "Failed to send terminate command".into(),
78+
reason: "Task channel full — cannot send terminate command".into(),
7679
})?;
7780
}
7881

@@ -131,7 +134,7 @@ impl TaskManager {
131134
}
132135

133136
/// Start the task execution loop
134-
fn start_task_loop(&self, mut task_receiver: mpsc::UnboundedReceiver<TaskCommand>) {
137+
fn start_task_loop(&self, mut task_receiver: mpsc::Receiver<TaskCommand>) {
135138
let task_timeout = self.task_timeout;
136139

137140
tokio::spawn(async move {

crates/runtime/tests/security_tests.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,79 @@ fn test_crypto_different_salts_produce_different_ciphertexts() {
388388
);
389389
}
390390

391+
// ============================================================================
392+
// Type-level Security Tests
393+
// ============================================================================
394+
395+
#[test]
396+
fn test_agent_id_is_unique() {
397+
use symbi_runtime::types::AgentId;
398+
let id1 = AgentId::new();
399+
let id2 = AgentId::new();
400+
assert_ne!(id1, id2);
401+
}
402+
403+
#[test]
404+
fn test_message_id_is_unique() {
405+
use symbi_runtime::types::MessageId;
406+
let id1 = MessageId::new();
407+
let id2 = MessageId::new();
408+
assert_ne!(id1, id2);
409+
}
410+
411+
#[cfg(test)]
412+
mod sandbox_security {
413+
use symbi_runtime::types::SecurityTier;
414+
415+
#[test]
416+
fn test_security_tier_ordering() {
417+
// Higher tiers should provide more isolation
418+
assert!(SecurityTier::Tier3 > SecurityTier::Tier2);
419+
assert!(SecurityTier::Tier2 > SecurityTier::Tier1);
420+
}
421+
}
422+
423+
#[cfg(test)]
424+
mod error_security {
425+
use symbi_runtime::types::error::*;
426+
use symbi_runtime::types::AgentId;
427+
428+
#[test]
429+
fn test_error_messages_dont_leak_secrets() {
430+
let err = SecurityError::AuthenticationFailed("token validation failed".to_string());
431+
let msg = format!("{err}");
432+
// Error message should not contain raw tokens or keys
433+
assert!(!msg.contains("Bearer"));
434+
assert!(!msg.contains("sk-"));
435+
}
436+
437+
#[test]
438+
fn test_boxed_str_error_variants_work() {
439+
let err = ResourceError::AllocationFailed {
440+
agent_id: AgentId::new(),
441+
reason: "insufficient memory".into(),
442+
};
443+
let msg = format!("{err}");
444+
assert!(msg.contains("insufficient memory"));
445+
}
446+
}
447+
448+
#[cfg(test)]
449+
mod communication_security {
450+
use symbi_runtime::types::error::CommunicationError;
451+
452+
#[test]
453+
fn test_message_size_limit() {
454+
let err = CommunicationError::MessageTooLarge {
455+
size: 2_000_000,
456+
max_size: 1_000_000,
457+
};
458+
let msg = format!("{err}");
459+
assert!(msg.contains("2000000"));
460+
assert!(msg.contains("1000000"));
461+
}
462+
}
463+
391464
// ============================================================================
392465
// Token Generation Tests
393466
// ============================================================================

0 commit comments

Comments
 (0)