diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 97a3720..64cea8f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -63,7 +63,7 @@ jobs: ~/.cargo/registry/cache/ ~/.cargo/git/db/ target/ - key: ${{ runner.os }}-cargo-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} + key: ${{ runner.os }}-cargo-${{ matrix.target }}-${{ hashFiles('Cargo.lock') }} restore-keys: | ${{ runner.os }}-cargo-${{ matrix.target }}- diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 08bfaa4..13fbb5b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -66,7 +66,7 @@ jobs: ~/.cargo/registry/cache/ ~/.cargo/git/db/ target/ - key: ${{ runner.os }}-cargo-${{ matrix.target }}-${{ hashFiles('**/Cargo.lock') }} + key: ${{ runner.os }}-cargo-${{ matrix.target }}-${{ hashFiles('Cargo.lock') }} restore-keys: | ${{ runner.os }}-cargo-${{ matrix.target }}- diff --git a/examples/flow-news.ro b/examples/flow-news.ro new file mode 100644 index 0000000..cf71c8b --- /dev/null +++ b/examples/flow-news.ro @@ -0,0 +1,14 @@ +flow fetchNews { + step search { + prompt "Get top 5 tech news today" + output { headlines: string[] } + } +} + +flow summarizeNews { + uses fetchNews + step summarize { + prompt "Summarize these headlines in 3 sentences" + output { summary: string } + } +} diff --git a/rohas-cli/src/main.rs b/rohas-cli/src/main.rs index 5f7eaa7..15357a7 100644 --- a/rohas-cli/src/main.rs +++ b/rohas-cli/src/main.rs @@ -4,7 +4,7 @@ use rohas_core::{Lexer, Parser}; use rohas_flow::FlowEngine; use rohas_llm::{LLMProvider, ProviderConfig}; use rohas_optimizer::TokenSaver; -use rohas_runtime::{Executor, Value}; +use rohas_runtime::Executor; use std::fs; use std::path::PathBuf; @@ -113,6 +113,11 @@ fn main() -> anyhow::Result<()> { } let flow_engine = FlowEngine::new(executor); + for statement in &program.statements { + if let rohas_core::ast::Statement::FlowStatement { name, .. } = statement { + flow_engine.register_flow(name.clone(), statement.clone()); + } + } let mut executor_guard = flow_engine.executor.lock().unwrap(); let mut results = Vec::new(); diff --git a/rohas-core/src/ast.rs b/rohas-core/src/ast.rs index 69d8dd7..98d28dd 100644 --- a/rohas-core/src/ast.rs +++ b/rohas-core/src/ast.rs @@ -83,6 +83,7 @@ pub enum Statement { FlowStatement { name: String, + uses: Vec, steps: Vec, }, StepStatement { @@ -115,6 +116,7 @@ pub struct FlowStep { pub parallel: bool, pub condition: Option, pub retry: Option, + pub output: Option, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/rohas-core/src/lexer.rs b/rohas-core/src/lexer.rs index dc74d91..e684d02 100644 --- a/rohas-core/src/lexer.rs +++ b/rohas-core/src/lexer.rs @@ -25,6 +25,7 @@ impl Lexer { keywords.insert("async".to_string(), Token::Async); keywords.insert("await".to_string(), Token::Await); keywords.insert("use".to_string(), Token::Use); + keywords.insert("uses".to_string(), Token::Use); keywords.insert("import".to_string(), Token::Import); keywords.insert("export".to_string(), Token::Export); keywords.insert("type".to_string(), Token::Type); @@ -42,6 +43,7 @@ impl Lexer { keywords.insert("state".to_string(), Token::State); keywords.insert("print".to_string(), Token::Print); keywords.insert("input".to_string(), Token::Input); + keywords.insert("output".to_string(), Token::Output); keywords.insert("flow".to_string(), Token::Flow); keywords.insert("step".to_string(), Token::Step); diff --git a/rohas-core/src/parser.rs b/rohas-core/src/parser.rs index c9ff1e1..eac473e 100644 --- a/rohas-core/src/parser.rs +++ b/rohas-core/src/parser.rs @@ -68,6 +68,7 @@ impl Parser { Some(Token::ToolCall) => self.parse_tool_call_statement(), Some(Token::Call) => self.parse_call_statement(), Some(Token::Print) => self.parse_print_statement(), + Some(Token::Output) => self.parse_output_statement(), Some(Token::Identifier(_)) => { let start_pos = self.current; @@ -254,7 +255,7 @@ impl Parser { fn parse_use_statement(&mut self) -> Result { self.consume(Token::Use)?; self.skip_whitespace(); - let name = self.consume_identifier()?; + let _name = self.consume_identifier()?; self.skip_whitespace(); self.consume_semicolon_optional(); @@ -351,6 +352,53 @@ impl Parser { Ok(Statement::PrintStatement { expression }) } + fn parse_output_statement(&mut self) -> Result { + self.consume(Token::Output)?; + self.skip_whitespace(); + self.consume(Token::LeftBrace)?; + self.skip_whitespace(); + let mut properties = HashMap::new(); + while !self.check(Token::RightBrace) { + let key = self.consume_identifier()?; + self.skip_whitespace(); + self.consume(Token::Colon)?; + self.skip_whitespace(); + let value = if let Some(Token::Identifier(_)) = self.peek() { + let start_pos = self.current; + let type_name = self.consume_identifier()?; + let is_array = self.match_token(Token::LeftBracket); + if is_array { + self.consume(Token::RightBracket)?; + if self.check(Token::LeftParen) || self.check(Token::Dot) || self.check(Token::LeftBracket) { + self.current = start_pos; + self.parse_expression()? + } else { + Expression::Literal(Literal::String(format!("{}[]", type_name))) + } + } else { + if self.check(Token::LeftParen) || self.check(Token::Dot) || self.check(Token::LeftBracket) { + self.current = start_pos; + self.parse_expression()? + } else { + Expression::Literal(Literal::String(type_name)) + } + } + } else { + self.parse_expression()? + }; + properties.insert(key, value); + self.skip_whitespace(); + if !self.check(Token::RightBrace) { + self.match_token(Token::Comma); + self.skip_whitespace(); + } + } + self.consume(Token::RightBrace)?; + Ok(Statement::ExpressionStatement { + expression: Expression::ObjectLiteral { properties }, + }) + } + fn parse_parallel_step_statement(&mut self) -> Result { self.consume(Token::Parallel)?; @@ -688,8 +736,30 @@ impl Parser { self.consume(Token::Flow)?; let name = self.consume_identifier()?; self.consume(Token::LeftBrace)?; + let mut uses = Vec::new(); let mut steps = Vec::new(); + while !self.check(Token::RightBrace) { + self.skip_whitespace(); + if self.check(Token::RightBrace) { + break; + } + + if self.match_token(Token::Use) { + let flow_name = self.consume_identifier()?; + uses.push(flow_name); + self.skip_whitespace(); + self.consume_semicolon_optional(); + continue; + } + + if self.check(Token::Step) || self.check(Token::Parallel) { + break; + } + + break; + } + while !self.check(Token::RightBrace) { self.skip_whitespace(); if self.check(Token::RightBrace) { @@ -738,7 +808,25 @@ impl Parser { None }; self.consume(Token::LeftBrace)?; - let statements = self.parse_block()?; + let mut statements = Vec::new(); + let mut output = None; + + while !self.check(Token::RightBrace) && !self.is_at_end() { + self.skip_whitespace(); + if self.check(Token::RightBrace) { + break; + } + + if self.check(Token::Output) { + let output_stmt = self.parse_output_statement()?; + if let Statement::ExpressionStatement { expression } = output_stmt { + output = Some(expression); + } + } else { + statements.push(self.parse_statement()?); + } + } + self.consume(Token::RightBrace)?; steps.push(FlowStep { name: step_name, @@ -746,11 +834,12 @@ impl Parser { parallel, condition, retry, + output, }); } self.consume(Token::RightBrace)?; - Ok(Statement::FlowStatement { name, steps }) + Ok(Statement::FlowStatement { name, uses, steps }) } fn parse_step_statement(&mut self) -> Result { diff --git a/rohas-core/src/token.rs b/rohas-core/src/token.rs index e24eacc..1e3af08 100644 --- a/rohas-core/src/token.rs +++ b/rohas-core/src/token.rs @@ -38,6 +38,7 @@ pub enum Token { State, Print, Input, + Output, Flow, Step, @@ -126,6 +127,7 @@ impl Token { | Token::State | Token::Print | Token::Input + | Token::Output | Token::Flow | Token::Step | Token::Parallel diff --git a/rohas-flow/src/engine.rs b/rohas-flow/src/engine.rs index aad80ab..07e9c61 100644 --- a/rohas-flow/src/engine.rs +++ b/rohas-flow/src/engine.rs @@ -1,10 +1,11 @@ use crate::memory::Memory; use crate::state::State; use crate::tools::ToolRegistry; -use anyhow::{Context, Result}; +use anyhow::Result; use rohas_core::ast::*; use rohas_runtime::value::Value; use rohas_runtime::Executor; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; pub struct FlowEngine { @@ -12,6 +13,7 @@ pub struct FlowEngine { state: Arc>, memory: Arc>, tools: Arc>, + flows: Arc>>, } impl FlowEngine { @@ -21,13 +23,33 @@ impl FlowEngine { state: Arc::new(Mutex::new(State::new())), memory: Arc::new(Mutex::new(Memory::new(100))), tools: Arc::new(Mutex::new(ToolRegistry::new())), + flows: Arc::new(Mutex::new(HashMap::new())), } } + pub fn register_flow(&self, name: String, flow: Statement) { + let mut flows = self.flows.lock().unwrap(); + flows.insert(name, flow); + } + pub fn execute_flow(&self, flow_statement: &Statement) -> Result> { - if let Statement::FlowStatement { name, steps } = flow_statement { + if let Statement::FlowStatement { name: _name, uses, steps } = flow_statement { let mut results = Vec::new(); + for used_flow_name in uses { + let used_flow = { + let flows = self.flows.lock().unwrap(); + flows.get(used_flow_name).cloned() + }; + + if let Some(used_flow) = used_flow { + let used_results = self.execute_flow(&used_flow)?; + results.extend(used_results); + } else { + return Err(anyhow::anyhow!("Flow '{}' not found (used by '{}')", used_flow_name, _name)); + } + } + for step in steps { if let Some(condition) = &step.condition { @@ -56,14 +78,34 @@ impl FlowEngine { } } } else { - + let mut step_results = Vec::new(); + let mut last_prompt_result: Option = None; + for stmt in &step.statements { let mut executor = self.executor.lock().unwrap(); let result = executor.execute_statement(stmt)?; drop(executor); if let Some(value) = result { - results.push(value); + if let Value::String(_) = &value { + last_prompt_result = Some(value.clone()); + } + step_results.push(value); + } + } + + if let (Some(output_expr), Some(prompt_result)) = (&step.output, last_prompt_result) { + if let Value::String(response_text) = prompt_result { + if let Expression::ObjectLiteral { properties } = output_expr { + let formatted = self.format_output(&response_text, properties)?; + results.push(formatted); + } else { + results.extend(step_results); + } + } else { + results.extend(step_results); } + } else { + results.extend(step_results); } } @@ -147,4 +189,88 @@ impl FlowEngine { let mut state = self.state.lock().unwrap(); state.set_variable(key, value); } + + fn format_output(&self, response_text: &str, output_schema: &std::collections::HashMap) -> Result { + use std::collections::HashMap; + + let json_value: Result = serde_json::from_str(response_text); + + let mut output_obj = HashMap::new(); + + if let Ok(json) = json_value { + if let serde_json::Value::Object(obj) = json { + for (key, _type_expr) in output_schema { + if let Some(value) = obj.get(key) { + output_obj.insert(key.clone(), self.json_value_to_value(value)?); + } + } + } + } else { + for (key, type_expr) in output_schema { + if let Expression::Literal(Literal::String(type_str)) = type_expr { + if type_str.ends_with("[]") { + if let Some(array_start) = response_text.find('[') { + if let Some(array_end) = response_text[array_start..].find(']') { + let array_str = &response_text[array_start..array_start + array_end + 1]; + if let Ok(json_array) = serde_json::from_str::(array_str) { + if let serde_json::Value::Array(arr) = json_array { + let values: Vec = arr.iter() + .filter_map(|v| self.json_value_to_value(v).ok()) + .collect(); + output_obj.insert(key.clone(), Value::Array(values)); + continue; + } + } + } + } + let items: Vec = response_text + .lines() + .filter_map(|line| { + let line = line.trim(); + let cleaned = line + .trim_start_matches(|c: char| c.is_ascii_digit() || c == '.' || c == '-' || c == '*') + .trim(); + if !cleaned.is_empty() && cleaned.len() > 3 { + Some(Value::String(cleaned.to_string())) + } else { + None + } + }) + .take(10) + .collect(); + if !items.is_empty() { + output_obj.insert(key.clone(), Value::Array(items)); + } + } else { + output_obj.insert(key.clone(), Value::String(response_text.to_string())); + } + } + } + } + + Ok(Value::Object(output_obj)) + } + + fn json_value_to_value(&self, json: &serde_json::Value) -> Result { + use std::collections::HashMap; + match json { + serde_json::Value::String(s) => Ok(Value::String(s.clone())), + serde_json::Value::Number(n) => Ok(Value::Number(n.as_f64().unwrap_or(0.0))), + serde_json::Value::Bool(b) => Ok(Value::Boolean(*b)), + serde_json::Value::Null => Ok(Value::Null), + serde_json::Value::Array(arr) => { + let values: Result, _> = arr.iter() + .map(|v| self.json_value_to_value(v)) + .collect(); + Ok(Value::Array(values?)) + }, + serde_json::Value::Object(obj) => { + let mut map = HashMap::new(); + for (k, v) in obj { + map.insert(k.clone(), self.json_value_to_value(v)?); + } + Ok(Value::Object(map)) + }, + } + } }