Skip to content

Commit dab2dcb

Browse files
committed
fix(cortex-engine): handle SIGPIPE by detecting closed event channel
Fixes bounty issue #1525 When cortex run is piped to a command that closes the pipe (e.g., head -n 1), the session event channel could return a Closed error. Previously this was ignored, causing the session to keep running. Now we detect TrySendError::Closed and properly shut down the session.
1 parent 8f839ec commit dab2dcb

File tree

2 files changed

+30
-14
lines changed

2 files changed

+30
-14
lines changed

cortex-engine/src/session.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::error::{CortexError, Result};
3030
use crate::rollout::reader::{RolloutItem, get_events, get_session_meta};
3131
use crate::rollout::recorder::SessionMeta;
3232
use crate::rollout::{RolloutRecorder, SESSIONS_SUBDIR, get_rollout_path, read_rollout};
33-
use crate::skills::{SkillsManager, render_skills_section, try_get_skills_manager};
33+
use crate::skills::{render_skills_section, try_get_skills_manager};
3434
use crate::summarization::SummarizationStrategy;
3535
use crate::tools::context::ToolOutputChunk;
3636
use crate::tools::{ToolContext, ToolRouter};
@@ -205,7 +205,10 @@ impl Session {
205205
// Initialize with system prompt (including skills if available)
206206
let skills_section = load_skills_section(&config.cwd);
207207
let mut messages = Vec::new();
208-
messages.push(Message::system(build_system_prompt_with_skills(&config, skills_section.as_deref())));
208+
messages.push(Message::system(build_system_prompt_with_skills(
209+
&config,
210+
skills_section.as_deref(),
211+
)));
209212

210213
// Initialize snapshot manager
211214
let snapshot_dir = config
@@ -1355,8 +1358,21 @@ impl Session {
13551358
let _ = recorder.record_event(&event);
13561359
}
13571360

1358-
// Non-blocking send - never wait
1359-
let _ = self.event_tx.try_send(event);
1361+
// Non-blocking send
1362+
// If the channel is closed (receiver dropped), we should stop the session
1363+
match self.event_tx.try_send(event) {
1364+
Ok(_) => {}
1365+
Err(async_channel::TrySendError::Full(_)) => {
1366+
// Channel full - dropping event (shouldn't happen with unbounded)
1367+
tracing::warn!("Event channel full, dropping event");
1368+
}
1369+
Err(async_channel::TrySendError::Closed(_)) => {
1370+
// Channel closed - receiver is gone (e.g. pipe broken)
1371+
tracing::info!("Event channel closed, stopping session");
1372+
self.running = false;
1373+
self.cancelled.store(true, Ordering::SeqCst);
1374+
}
1375+
}
13601376
}
13611377

13621378
/// Resume a session from a rollout file.
@@ -1794,12 +1810,12 @@ fn build_system_prompt_with_skills(config: &Config, skills_section: Option<&str>
17941810
}
17951811

17961812
// Add skills section if provided
1797-
if let Some(skills) = skills_section {
1798-
if !skills.is_empty() {
1799-
additional.push_str("\n## Available Skills\n");
1800-
additional.push_str(skills);
1801-
additional.push('\n');
1802-
}
1813+
if let Some(skills) = skills_section
1814+
&& !skills.is_empty()
1815+
{
1816+
additional.push_str("\n## Available Skills\n");
1817+
additional.push_str(skills);
1818+
additional.push('\n');
18031819
}
18041820

18051821
prompt = prompt.replace("{{ADDITIONAL_CONTEXT}}", &additional);
@@ -1897,14 +1913,14 @@ fn get_system_info() -> String {
18971913
fn load_skills_section(cwd: &PathBuf) -> Option<String> {
18981914
// Try to get the global skills manager
18991915
let manager = try_get_skills_manager()?;
1900-
1916+
19011917
// Load skills for current working directory (synchronously)
19021918
let outcome = manager.skills_for_cwd_sync(cwd, false);
1903-
1919+
19041920
if outcome.skills.is_empty() {
19051921
return None;
19061922
}
1907-
1923+
19081924
// Render skills section for system prompt
19091925
render_skills_section(&outcome.skills)
19101926
}

cortex-engine/src/tasks/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ impl SnapshotManager {
292292
match state {
293293
FileState::Exists {
294294
content,
295-
permissions: _,
295+
permissions,
296296
..
297297
} => {
298298
// Create parent directories

0 commit comments

Comments
 (0)