From a9253a2cc82534b10a285d27e2ae5763252de7ab Mon Sep 17 00:00:00 2001 From: Raffael Rott Date: Sun, 12 Apr 2026 16:20:24 +0200 Subject: [PATCH 1/3] Added basic config --- src/config/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/config/mod.rs b/src/config/mod.rs index a9b2e5f..06a48e7 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -21,3 +21,4 @@ pub fn load_config(path: &str) -> Result { .try_deserialize() .with_context(|| format!("Failed to deserialize config from {}", path)) } + From 64c9fa7a2eab150aff40c203e32a9b4da0385538 Mon Sep 17 00:00:00 2001 From: Michael Debertol Date: Thu, 7 May 2026 17:29:57 +0200 Subject: [PATCH 2/3] implement suggestions - load mappings from a directory - change mapping to be grouped by node - simplify some code --- src/config/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 06a48e7..a9b2e5f 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -21,4 +21,3 @@ pub fn load_config(path: &str) -> Result { .try_deserialize() .with_context(|| format!("Failed to deserialize config from {}", path)) } - From e771287ce6a70157d0d846756dbd69438811768e Mon Sep 17 00:00:00 2001 From: Raffael Rott Date: Sat, 4 Jul 2026 14:38:41 +0200 Subject: [PATCH 3/3] fixed rebases --- src/bin/ff-socket-cli.rs | 345 ++++++++++++++++++ src/config/mod.rs | 21 ++ src/db/mod.rs | 2 +- src/events/mod.rs | 14 +- src/lib.rs | 9 + src/nodes/mod.rs | 2 +- src/nodes/node_manager.rs | 40 +- src/nodes/node_manager/snapshots.rs | 207 +++++++++++ src/socket/PROTOCOL.md | 272 ++++++++++++++ src/socket/mod.rs | 546 ++++++++++++++++++++++++++++ tests/db_logging.rs | 17 +- tests/emulator.rs | 3 +- 12 files changed, 1447 insertions(+), 31 deletions(-) create mode 100644 src/bin/ff-socket-cli.rs create mode 100644 src/nodes/node_manager/snapshots.rs create mode 100644 src/socket/PROTOCOL.md diff --git a/src/bin/ff-socket-cli.rs b/src/bin/ff-socket-cli.rs new file mode 100644 index 0000000..a2192c7 --- /dev/null +++ b/src/bin/ff-socket-cli.rs @@ -0,0 +1,345 @@ +use std::{ + io::{self, ErrorKind, Read, Write}, + net::{SocketAddr, TcpListener, TcpStream}, + sync::mpsc::{self, TryRecvError}, + thread, + time::Duration, +}; + +use anyhow::{Context, Result, bail}; +use serde_json::{Value, json}; + +const DEFAULT_ADDR: &str = "127.0.0.1:8080"; +const POLL_PERIOD: Duration = Duration::from_millis(50); + +fn main() -> Result<()> { + let addr = parse_addr()?; + let listener = TcpListener::bind(addr).with_context(|| format!("failed to bind {addr}"))?; + listener.set_nonblocking(true)?; + + println!("FerroFlow socket CLI listening on {addr}"); + print_help(); + + let (tx, rx) = mpsc::channel(); + thread::spawn(move || stdin_thread(tx)); + + loop { + match wait_for_connection(&listener, &rx)? { + WaitResult::Connected(stream) => { + println!("connected: {}", stream.peer_addr()?); + match connection_loop(stream, &rx)? { + ConnectionResult::Disconnected => { + println!("disconnected; waiting for FerroFlow to reconnect"); + } + ConnectionResult::Quit => return Ok(()), + } + } + WaitResult::Quit => return Ok(()), + } + } +} + +fn parse_addr() -> Result { + let mut args = std::env::args().skip(1); + let mut addr = DEFAULT_ADDR.to_string(); + + while let Some(arg) = args.next() { + match arg.as_str() { + "-a" | "--addr" => { + addr = args + .next() + .with_context(|| format!("{arg} requires an address"))?; + } + "-h" | "--help" => { + println!("Usage: cargo run --bin ff-socket-cli -- [--addr 127.0.0.1:8080]"); + std::process::exit(0); + } + other => addr = other.to_string(), + } + } + + addr.parse() + .with_context(|| format!("invalid socket address {addr}")) +} + +fn stdin_thread(tx: mpsc::Sender) { + let stdin = io::stdin(); + loop { + print!("ff> "); + let _ = io::stdout().flush(); + + let mut line = String::new(); + match stdin.read_line(&mut line) { + Ok(0) => { + let _ = tx.send(CliCommand::Quit); + return; + } + Ok(_) => match parse_command(line.trim()) { + Ok(Some(command)) => { + if tx.send(command).is_err() { + return; + } + } + Ok(None) => {} + Err(error) => eprintln!("command error: {error:#}"), + }, + Err(error) => { + eprintln!("stdin error: {error}"); + let _ = tx.send(CliCommand::Quit); + return; + } + } + } +} + +fn parse_command(line: &str) -> Result> { + let mut parts = line.split_whitespace(); + let Some(command) = parts.next() else { + return Ok(None); + }; + + match command { + "help" | "h" | "?" => Ok(Some(CliCommand::Help)), + "quit" | "q" | "exit" => Ok(Some(CliCommand::Quit)), + "nodes" | "get-nodes" => Ok(Some(CliCommand::Send(json!({ + "type": "get_nodes", + "content": {} + })))), + "telemetry" | "get-telemetry" => Ok(Some(CliCommand::Send(json!({ + "type": "get_telemetry", + "content": {} + })))), + "get-mapped" => { + let name = next_arg(&mut parts, "mapped field name")?; + Ok(Some(CliCommand::Send(json!({ + "type": "get_field", + "content": { + "field": { + "type": "mapped", + "name": name + } + } + })))) + } + "get-raw" => { + let node_name = next_arg(&mut parts, "node name")?; + let field_name = next_arg(&mut parts, "raw field name")?; + Ok(Some(CliCommand::Send(json!({ + "type": "get_field", + "content": { + "field": { + "type": "raw", + "node_name": node_name, + "field_name": field_name + } + } + })))) + } + "set-mapped" => { + let name = next_arg(&mut parts, "mapped parameter name")?; + let value = next_arg(&mut parts, "numeric value")? + .parse::() + .with_context(|| "mapped parameter value must be numeric")?; + Ok(Some(CliCommand::Send(json!({ + "type": "set_parameter", + "content": { + "field": { + "type": "mapped", + "name": name + }, + "value": value + } + })))) + } + "set-raw" => { + let node_name = next_arg(&mut parts, "node name")?; + let field_name = next_arg(&mut parts, "raw field name")?; + let value_text = parts.collect::>().join(" "); + if value_text.is_empty() { + bail!("missing JSON value"); + } + let value = serde_json::from_str::(&value_text) + .with_context(|| "raw parameter value must be valid JSON")?; + Ok(Some(CliCommand::Send(json!({ + "type": "set_parameter", + "content": { + "field": { + "type": "raw", + "node_name": node_name, + "field_name": field_name + }, + "value": value + } + })))) + } + "send-json" | "send" => { + let message = line.strip_prefix(command).unwrap_or_default().trim_start(); + if message.is_empty() { + bail!("missing JSON message"); + } + Ok(Some(CliCommand::Send( + serde_json::from_str(message).with_context(|| "invalid JSON message")?, + ))) + } + other => bail!("unknown command {other}; type 'help'"), + } +} + +fn next_arg<'a>( + parts: &mut impl Iterator, + description: &'static str, +) -> Result<&'a str> { + parts + .next() + .with_context(|| format!("missing {description}")) +} + +fn wait_for_connection( + listener: &TcpListener, + rx: &mpsc::Receiver, +) -> Result { + loop { + match listener.accept() { + Ok((stream, _)) => return Ok(WaitResult::Connected(stream)), + Err(error) if error.kind() == ErrorKind::WouldBlock => {} + Err(error) => return Err(error).with_context(|| "failed to accept connection"), + } + + match rx.try_recv() { + Ok(CliCommand::Quit) => return Ok(WaitResult::Quit), + Ok(CliCommand::Help) => print_help(), + Ok(CliCommand::Send(_)) => { + eprintln!("not connected yet; command was ignored"); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => return Ok(WaitResult::Quit), + } + + thread::sleep(POLL_PERIOD); + } +} + +fn connection_loop( + mut stream: TcpStream, + rx: &mpsc::Receiver, +) -> Result { + stream.set_nonblocking(true)?; + let mut buffer = Vec::new(); + + loop { + match read_messages(&mut stream, &mut buffer)? { + ReadState::Open(messages) => { + for message in messages { + print_received_message(&message); + } + } + ReadState::Closed => return Ok(ConnectionResult::Disconnected), + } + + loop { + match rx.try_recv() { + Ok(CliCommand::Send(message)) => { + send_message(&mut stream, &message)?; + println!("<- {}", compact_json(&message)); + } + Ok(CliCommand::Help) => print_help(), + Ok(CliCommand::Quit) => return Ok(ConnectionResult::Quit), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return Ok(ConnectionResult::Quit), + } + } + + thread::sleep(POLL_PERIOD); + } +} + +fn read_messages(stream: &mut TcpStream, buffer: &mut Vec) -> Result { + let mut scratch = [0_u8; 4096]; + loop { + match stream.read(&mut scratch) { + Ok(0) => return Ok(ReadState::Closed), + Ok(bytes_read) => buffer.extend_from_slice(&scratch[..bytes_read]), + Err(error) if error.kind() == ErrorKind::Interrupted => continue, + Err(error) if error.kind() == ErrorKind::WouldBlock => break, + Err(error) => return Err(error).with_context(|| "failed to read socket"), + } + } + + let mut messages = Vec::new(); + while buffer.len() >= 2 { + let len = u16::from_be_bytes([buffer[0], buffer[1]]) as usize; + if buffer.len() < len + 2 { + break; + } + + messages.push(buffer[2..len + 2].to_vec()); + buffer.drain(..len + 2); + } + + Ok(ReadState::Open(messages)) +} + +fn send_message(stream: &mut TcpStream, message: &Value) -> Result<()> { + let payload = serde_json::to_vec(message)?; + let len = u16::try_from(payload.len()) + .with_context(|| format!("message too large: {} bytes", payload.len()))?; + + stream.write_all(&len.to_be_bytes())?; + stream.write_all(&payload)?; + Ok(()) +} + +fn print_received_message(message: &[u8]) { + match serde_json::from_slice::(message) { + Ok(value) => println!("-> {}", pretty_json(&value)), + Err(error) => { + println!("-> "); + println!("{}", String::from_utf8_lossy(message)); + } + } +} + +fn pretty_json(value: &Value) -> String { + serde_json::to_string_pretty(value).unwrap_or_else(|_| value.to_string()) +} + +fn compact_json(value: &Value) -> String { + serde_json::to_string(value).unwrap_or_else(|_| value.to_string()) +} + +fn print_help() { + println!( + r#"commands: + nodes request current nodes + telemetry request full telemetry snapshot + get-mapped request mapped field value + get-raw request raw field by device_name + raw field name + set-mapped set mapped parameter using a numeric mapped value + set-raw set raw parameter using a JSON value + send-json send a complete protocol message + help show this help + quit exit +"# + ); +} + +enum CliCommand { + Send(Value), + Help, + Quit, +} + +enum WaitResult { + Connected(TcpStream), + Quit, +} + +enum ConnectionResult { + Disconnected, + Quit, +} + +enum ReadState { + Open(Vec>), + Closed, +} diff --git a/src/config/mod.rs b/src/config/mod.rs index a9b2e5f..60daa59 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -10,6 +10,27 @@ pub struct Config { pub heartbeat_period: u64, pub database_url: String, pub mapping_path: String, + #[serde(default)] + pub webserver_socket: WebserverSocketConfig, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct WebserverSocketConfig { + pub enabled: bool, + pub host: String, + pub port: u16, + pub reconnect_period_ms: u64, +} + +impl Default for WebserverSocketConfig { + fn default() -> Self { + Self { + enabled: false, + host: "127.0.0.1".to_string(), + port: 8080, + reconnect_period_ms: 3000, + } + } } pub fn load_config(path: &str) -> Result { diff --git a/src/db/mod.rs b/src/db/mod.rs index 4b65b83..41df1bf 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -39,7 +39,7 @@ pub fn spawn_logging_worker<'a>( loop { match rx.recv_timeout(flush_timeout) { - Ok(Event::NodeFieldUpdated(log)) => { + Ok(Event::NodeFieldUpdated(log, _)) => { batch.push(log); if batch.len() < batch_size_limit { continue; diff --git a/src/events/mod.rs b/src/events/mod.rs index 8a197b2..5e5ceb5 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -13,7 +13,8 @@ pub enum Event { id: CanMessageId, message: CanMessage, }, - NodeFieldUpdated(crate::db::FieldLog), + NodeFieldUpdated(crate::db::FieldLog, NodeFieldUpdateSource), + NodeListUpdated, Shutdown, #[allow(unused)] SendCanMessage { @@ -33,10 +34,18 @@ pub enum Event { AbortSequence, } +#[derive(Debug, Copy, Clone)] +pub enum NodeFieldUpdateSource { + FieldGetRes, + ParameterSetConfirmation, + TelemetryUpdate, +} + #[derive(Debug, Hash, Eq, PartialEq)] pub enum EventKind { CanMessageReceived, NodeFieldUpdated, + NodeListUpdated, Shutdown, SendCanMessage, RelayCanMessage, @@ -47,7 +56,8 @@ impl From for EventKind { fn from(value: Event) -> Self { match value { Event::CanMessageReceived { .. } => EventKind::CanMessageReceived, - Event::NodeFieldUpdated(_) => EventKind::NodeFieldUpdated, + Event::NodeFieldUpdated(_, _) => EventKind::NodeFieldUpdated, + Event::NodeListUpdated => EventKind::NodeListUpdated, Event::Shutdown => EventKind::Shutdown, Event::SendCanMessage { .. } => EventKind::SendCanMessage, Event::RelayCanMessage { .. } => EventKind::RelayCanMessage, diff --git a/src/lib.rs b/src/lib.rs index a0ef84c..19843e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,15 @@ pub fn run_with_dependencies( scope, ); + if config.webserver_socket.enabled { + socket::spawn_webserver_socket_worker( + config.webserver_socket.clone(), + node_manager, + event_dispatcher, + scope, + ); + } + node_manager.start_node_registration(); Ok(()) diff --git a/src/nodes/mod.rs b/src/nodes/mod.rs index 2657f56..f1a05ef 100644 --- a/src/nodes/mod.rs +++ b/src/nodes/mod.rs @@ -4,7 +4,7 @@ mod can_node; pub mod mapping; mod node_manager; -pub use node_manager::NodeManager; +pub use node_manager::{FieldValueSnapshot, NodeManager, NodeSnapshot, NodeTelemetrySnapshot}; use std::{ sync::mpsc::{self, RecvTimeoutError}, time::{Duration, Instant}, diff --git a/src/nodes/node_manager.rs b/src/nodes/node_manager.rs index 844b477..44e3073 100644 --- a/src/nodes/node_manager.rs +++ b/src/nodes/node_manager.rs @@ -1,4 +1,7 @@ -pub mod command; +mod command; +mod snapshots; + +pub use snapshots::{FieldValueSnapshot, NodeSnapshot, NodeTelemetrySnapshot}; use std::{collections::HashMap, sync::Mutex}; @@ -101,7 +104,7 @@ impl<'a> NodeManager<'a> { let node = CanNode::new(registration_info); if node.node_registration_complete() { - self.can_nodes.insert(node_id, node); + self.complete_node_registration(node_id, node); } else { self.registering_nodes .lock() @@ -144,7 +147,7 @@ impl<'a> NodeManager<'a> { node_id ) })?; - self.can_nodes.insert(node_id, completed_node); + self.complete_node_registration(node_id, completed_node); } Ok(()) } else { @@ -182,7 +185,8 @@ impl<'a> NodeManager<'a> { node_id ) })?; - self.can_nodes.insert(node_id, completed_node); + + self.complete_node_registration(node_id, completed_node); } Ok(()) @@ -257,7 +261,10 @@ impl<'a> NodeManager<'a> { field_value: Self::can_data_value_to_json(value), }; self.event_dispatcher - .dispatch(events::Event::NodeFieldUpdated(telemetry_log)); + .dispatch(events::Event::NodeFieldUpdated( + telemetry_log, + events::NodeFieldUpdateSource::TelemetryUpdate, + )); } Ok(()) @@ -311,7 +318,10 @@ impl<'a> NodeManager<'a> { }; self.event_dispatcher - .dispatch(events::Event::NodeFieldUpdated(telemetry_log)); + .dispatch(events::Event::NodeFieldUpdated( + telemetry_log, + events::NodeFieldUpdateSource::FieldGetRes, + )); Ok(()) } @@ -372,18 +382,10 @@ impl<'a> NodeManager<'a> { &self.can_nodes } - fn can_data_value_to_json(value: CanDataValue) -> serde_json::Value { - match value { - CanDataValue::Float32(v) => serde_json::json!(v), - CanDataValue::Int32(v) => serde_json::json!(v), - CanDataValue::Int16(v) => serde_json::json!(v), - CanDataValue::Int8(v) => serde_json::json!(v), - CanDataValue::UInt32(v) => serde_json::json!(v), - CanDataValue::UInt16(v) => serde_json::json!(v), - CanDataValue::UInt8(v) => serde_json::json!(v), - CanDataValue::Boolean(v) => serde_json::json!(v), - CanDataValue::Raw(items) => serde_json::json!(items), - } + fn complete_node_registration(&self, node_id: u8, node: CanNode) { + self.can_nodes.insert(node_id, node); + self.event_dispatcher + .dispatch(events::Event::NodeListUpdated); } fn is_mapped_name(field_name: &str) -> bool { @@ -955,7 +957,7 @@ mod tests { let mut logs = vec![]; for evt in [evt1, evt2] { match evt { - Event::NodeFieldUpdated(log) => logs.push(log), + Event::NodeFieldUpdated(log, ..) => logs.push(log), other => panic!("unexpected event: {other:?}"), } } diff --git a/src/nodes/node_manager/snapshots.rs b/src/nodes/node_manager/snapshots.rs new file mode 100644 index 0000000..2966d48 --- /dev/null +++ b/src/nodes/node_manager/snapshots.rs @@ -0,0 +1,207 @@ +use anyhow::Result; +use liquidcan::payloads::CanDataValue; + +use crate::nodes::mapping::FieldType; + +use super::super::can_node::CanNode; +use super::NodeManager; + +#[derive(Clone, Debug)] +pub struct NodeSnapshot { + pub id: u8, + pub name: String, + pub fields: Vec, +} + +#[derive(Clone, Debug)] +pub struct FieldSnapshot { + pub id: u8, + pub name: String, + pub raw_name: String, + pub mapped_name: Option, + pub kind: FieldType, +} + +#[derive(Clone, Debug)] +pub struct NodeTelemetrySnapshot { + pub id: u8, + pub name: String, + pub telemetry: Vec, +} + +#[derive(Clone, Debug)] +pub struct FieldValueSnapshot { + pub node_id: u8, + pub id: u8, + pub node_name: String, + pub raw_name: String, + pub mapped_name: Option, + pub name: String, + pub raw: serde_json::Value, + pub value: serde_json::Value, + pub unit: String, + pub logical: serde_json::Value, +} + +impl<'a> NodeManager<'a> { + pub fn nodes_snapshot(&self) -> Vec { + let mut nodes = self + .can_nodes + .iter() + .map(|node| { + let node_id = *node.key(); + let node_name = node.registration_info.device_name.clone(); + let mut fields = Vec::new(); + + for (node_fields, field_type) in [ + (&node.telemetry_fields, FieldType::Telemetry), + (&node.parameter_fields, FieldType::Parameter), + ] { + fields.extend(node_fields.iter().map(|(&id, field)| { + // TODO: we should avoid having to look up the mapping every time + let mapped_name = self + .mapping + .get_mapping_for_raw(&node_name, &field.name) + .map(|mapping| mapping.mapping_entry.name.clone()); + let name = mapped_name.clone().unwrap_or_else(|| field.name.clone()); + FieldSnapshot { + id, + name, + raw_name: field.name.clone(), + mapped_name, + kind: field_type, + } + })) + } + + NodeSnapshot { + id: node_id, + name: node_name, + fields, + } + }) + .collect::>(); + nodes.sort_by_key(|node| node.id); + nodes + } + + pub fn telemetry_snapshot(&self) -> Vec { + self.can_nodes + .iter() + .filter_map(|node| { + let telemetry = node + .values + .iter() + .filter_map(|value| { + self.field_value_snapshot_from_node(*node.key(), &node, *value.key()) + }) + .collect::>(); + + if telemetry.is_empty() { + return None; + } + + Some(NodeTelemetrySnapshot { + id: *node.key(), + name: node.registration_info.device_name.clone(), + telemetry, + }) + }) + .collect() + } + + pub fn field_value_snapshot_by_id( + &self, + node_id: u8, + field_id: u8, + ) -> Option { + self.can_nodes + .get(&node_id) + .and_then(|node| self.field_value_snapshot_from_node(node_id, &node, field_id)) + } + + pub fn field_value_snapshot_by_mapped_name( + &self, + mapped_name: &str, + ) -> Result> { + let (_, target) = self.resolve_mapping_by_name(mapped_name)?; + + Ok(self.field_value_snapshot_by_id(target.node_id, target.field_id)) + } + + pub(super) fn can_data_value_to_json(value: CanDataValue) -> serde_json::Value { + match value { + CanDataValue::Float32(v) => serde_json::json!(v), + CanDataValue::Int32(v) => serde_json::json!(v), + CanDataValue::Int16(v) => serde_json::json!(v), + CanDataValue::Int8(v) => serde_json::json!(v), + CanDataValue::UInt32(v) => serde_json::json!(v), + CanDataValue::UInt16(v) => serde_json::json!(v), + CanDataValue::UInt8(v) => serde_json::json!(v), + CanDataValue::Boolean(v) => serde_json::json!(v), + CanDataValue::Raw(items) => serde_json::json!(items), + } + } + + fn field_value_snapshot_from_node( + &self, + node_id: u8, + node: &CanNode, + field_id: u8, + ) -> Option { + let node_name = &node.registration_info.device_name; + let field = node + .telemetry_fields + .get(&field_id) + .or_else(|| node.parameter_fields.get(&field_id))?; + + let raw_value = node.values.get(&field_id).map(|value| value.1.clone())?; + let raw_json = Self::can_data_value_to_json(raw_value.clone()); + let mapping = self.mapping.get_mapping_for_raw(node_name, &field.name); + + let (mapped_name, name, value, unit, logical) = if let Some(mapping) = mapping { + let mapped = mapping.mapping_entry.mapped_value(&raw_value).ok(); + let value = mapped + .as_ref() + .map(|mapped| serde_json::json!(mapped.value)) + .unwrap_or_else(|| raw_json.clone()); + let unit = mapped + .as_ref() + .map(|mapped| mapped.unit.clone()) + .unwrap_or_default(); + let logical = mapped + .and_then(|mapped| mapping.mapping_entry.logical_value(mapped.value)) + .and_then(|logical| serde_json::to_value(logical.value).ok()) + .unwrap_or(serde_json::Value::Null); + + ( + Some(mapping.mapping_entry.name.clone()), + mapping.mapping_entry.name.clone(), + value, + unit, + logical, + ) + } else { + ( + None, + field.name.clone(), + raw_json.clone(), + String::new(), + serde_json::Value::Null, + ) + }; + + Some(FieldValueSnapshot { + node_id, + id: field_id, + node_name: node_name.clone(), + raw_name: field.name.clone(), + mapped_name, + name, + raw: raw_json, + value, + unit, + logical, + }) + } +} diff --git a/src/socket/PROTOCOL.md b/src/socket/PROTOCOL.md new file mode 100644 index 0000000..45ecea1 --- /dev/null +++ b/src/socket/PROTOCOL.md @@ -0,0 +1,272 @@ +# Socket Protocol + +## Message Format + +Messages are sent over TCP. Each message starts with a 2-byte unsigned big-endian length header, followed by the JSON-encoded message body. + +Every message body is a JSON object: + +```json +{ + "type": "message_type", + "content": { + // message-specific data + }, +} +``` + + + +## Field References + +Fields can be referenced by either their mapped name or their raw name. +Mapped names are unique across all nodes, while raw names are only unique within a node. +Raw names are constructed from the node name and the field name: `:`. +Seperate from this + + + +The value is either the mapped or the raw value, depending on the `value_type` of the field reference. + +Commands that target a field use one of these explicit references: + +mapped value reference, resolved by mapped name: +```json +{ + "value_type": "mapped", + "field_name": "tank_pressure" +} +``` + +mapped value reference, resolved by raw name: +```json +{ + "value_type": "mapped", + "field_name": "ECU:tank_pressure" +} +``` + + +Raw value reference, resolved raw name: + +```json +{ + "value_type": "raw", + "field_name": "ECU:tank_pressure" +} +``` + +Raw value reference, resolved mapped name: + +```json +{ + "value_type": "raw", + "field_name": "tank_pressure" +} +``` + +## Messages From FerroFlow + +### Telemetry + +Sent at connection and on request. Contains all current cached telemetry values for all nodes. + +```json +{ + "type": "telemetry", + "content": { + "timestamp": 1710000000000, + "nodes": [ + { + "id": 5, + "name": "ECU", + "telemetry": [ + { + "id": 10, + "name": "tank_pressure", + "raw_name": "ECU:pressure_adc", + "mapped_name": "tank_pressure", + "raw": 198, + "value": 100.0, + "unit": "bar", + "logical": "High" + } + ] + } + ] + } +} +``` + +If a field has no mapping, `name` equals `raw_name`, `mapped_name` is `null`, `value` equals `raw`, `unit` is empty, and `logical` is `null`. + +### TelemetryDelta + +Sent when telemetry values change. Contains values that changed since the previous `telemetry` or `telemetry_delta` message sent on the socket. + +```json +{ + "type": "telemetry_delta", + "content": { + "prev_timestamp": 1710000000000, + "timestamp": 1710000000020, + "nodes": [ + { + "id": 5, + "name": "ECU", + "telemetry": [ + { + "id": 10, + "name": "tank_pressure", + "raw_name": "ECU:pressure_adc", + "mapped_name": "tank_pressure", + "raw": 199, + "value": 100.5, + "unit": "bar", + "logical": "High" + } + ] + } + ] + } +} +``` + +### Nodes + +Sent at connection, whenever the registered node list changes, and on request. + +```json +{ + "type": "nodes", + "content": { + "nodes": [ + { + "id": 5, + "name": "ECU", + "fields": [ + { + "id": 10, + "type": "telemetry", + "name": "tank_pressure", + "raw_name": "ECU:pressure_adc", + "mapped_name": "tank_pressure" + }, + { + "id": 20, + "type": "parameter", + "name": "valve_opening", + "raw_name": "ECU:valve_raw", + "mapped_name": "valve_opening" + } + ] + } + ] + } +} +``` + +### FieldGetResponse + +Sent in response to a `get_field` command once the value is available. + +```json +{ + "type": "field_get_response", + "content": { + "node_id": 5, + "node_name": "ECU", + "raw_name": "ECU:pressure_adc", + "mapped_name": "tank_pressure", + "name": "tank_pressure", + "raw": 198, + "value": 100.0, + "unit": "bar", + "logical": "High" + } +} +``` + +## Commands To FerroFlow + +### SetParameter + +Sets a parameter value. With a mapped reference, FerroFlow inverse-applies the mapping before sending the raw CAN value. With a raw reference, FerroFlow sends the JSON value converted directly to the registered CAN data type. +The type of value set is defined by the `value_type` of the field reference. + +```json +{ + "type": "set_parameter", + "content": { + "field": { + "value_type": "mapped", + "name": "valve_opening" + }, + "value": 60.0 + } +} +``` + +```json +{ + "type": "set_parameter", + "content": { + "field": { + "value_type": "raw", + "name": "valve_opening" + }, + "value": 100 + } +} +``` + +### GetField + +Requests the current value of a field from a node. +The value is either the raw or the mapped value, depending on the `value_type` of the field reference. + +```json +{ + "type": "get_field", + "content": { + "field": { + "value_type": "raw", + "name": "valve_opening" + } + } +} +``` + +```json +{ + "type": "get_field", + "content": { + "field": { + "value_type": "mapped", + "name": "valve_opening" + } + } +} +``` + +### GetNodes + +Requests a `nodes` message. + +```json +{ + "type": "get_nodes", + "content": {} +} +``` + +### GetTelemetry + +Requests a full `telemetry` message. + +```json +{ + "type": "get_telemetry", + "content": {} +} +``` diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 31f14a7..cfdd894 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -1 +1,547 @@ //! Code for managing socket connections to the frontend. + +use std::{ + io::{ErrorKind, Read, Write}, + net::{SocketAddr, TcpStream, ToSocketAddrs}, + sync::mpsc::{self, TryRecvError}, + thread, + time::Duration, +}; + +use anyhow::{Context, Result, bail}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; + +use crate::{ + config::WebserverSocketConfig, + events::{self, Event, EventKind}, + nodes::{FieldValueSnapshot, NodeManager, NodeTelemetrySnapshot, mapping::FieldType}, +}; + +const SOCKET_POLL_PERIOD: Duration = Duration::from_millis(50); + +pub fn spawn_webserver_socket_worker<'a>( + config: WebserverSocketConfig, + node_manager: &'a NodeManager<'a>, + event_dispatcher: &'a events::EventDispatcher, + scope: &'a thread::Scope<'a, '_>, +) { + let (tx, rx) = mpsc::channel::(); + event_dispatcher.subscribe( + tx, + vec![ + EventKind::Shutdown, + EventKind::NodeFieldUpdated, + EventKind::NodeListUpdated, + ], + "Webserver socket thread", + ); + + scope.spawn(move || socket_worker(config, node_manager, rx)); +} + +fn socket_worker( + config: WebserverSocketConfig, + node_manager: &NodeManager<'_>, + rx: mpsc::Receiver, +) { + let reconnect_period = Duration::from_millis(config.reconnect_period_ms); + let address = match resolve_socket_addr(&config) { + Ok(address) => address, + Err(error) => { + eprintln!("Failed to resolve webserver socket address: {error:#}"); + return; + } + }; + + loop { + match TcpStream::connect(address) { + Ok(mut stream) => { + println!("Connected to webserver socket at {address}"); + if let Err(error) = stream.set_nonblocking(true) { + eprintln!("Failed to configure webserver socket as nonblocking: {error:#}"); + return; + } + + if let Err(error) = connected_socket_worker(&mut stream, node_manager, &rx) { + eprintln!("Error in webserver socket worker: {error:#}"); + } else { + println!("Webserver socket worker exiting gracefully."); + return; + } + } + Err(error) => { + eprintln!("Failed to connect to webserver socket at {address}: {error:#}"); + } + } + + match rx.recv_timeout(reconnect_period) { + Ok(Event::Shutdown) | Err(mpsc::RecvTimeoutError::Disconnected) => return, + Ok(_) | Err(mpsc::RecvTimeoutError::Timeout) => {} + } + } +} + +fn connected_socket_worker( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + rx: &mpsc::Receiver, +) -> Result<()> { + let mut msg_buf = Vec::new(); + let mut last_telemetry_timestamp = Utc::now().timestamp_millis(); + + send_nodes_message(stream, node_manager)?; + send_telemetry_message(stream, node_manager, last_telemetry_timestamp)?; + + loop { + match rx.recv_timeout(SOCKET_POLL_PERIOD) { + Ok(event) => { + handle_event(stream, node_manager, event, &mut last_telemetry_timestamp)?; + } + Err(mpsc::RecvTimeoutError::Timeout) => {} + Err(mpsc::RecvTimeoutError::Disconnected) => return Ok(()), + } + + loop { + match rx.try_recv() { + Ok(Event::Shutdown) => return Ok(()), + Ok(event) => { + handle_event(stream, node_manager, event, &mut last_telemetry_timestamp)?; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return Ok(()), + } + } + + while let Some(msg_len) = read_message(stream, &mut msg_buf)? { + msg_buf.drain(..2); // remove length prefix + handle_command( + stream, + node_manager, + &msg_buf[..msg_len], + &mut last_telemetry_timestamp, + )?; + msg_buf.drain(..msg_len); + } + } +} + +fn handle_event( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + event: Event, + last_telemetry_timestamp: &mut i64, +) -> Result<()> { + match event { + Event::NodeListUpdated => send_nodes_message(stream, node_manager), + Event::NodeFieldUpdated(field_log, source) => { + let Some(field) = node_manager + .field_value_snapshot_by_id(field_log.node_id as u8, field_log.field_id as u8) + else { + return Ok(()); + }; + + let node_id = field_log.node_id as u8; + match source { + events::NodeFieldUpdateSource::FieldGetRes => { + let response = FieldGetResponseContent::from_field(node_id, field); + send_message(stream, "field_get_response", response) + } + events::NodeFieldUpdateSource::ParameterSetConfirmation => { + // TODO + Ok(()) + } + events::NodeFieldUpdateSource::TelemetryUpdate => { + let previous_timestamp = *last_telemetry_timestamp; + let timestamp = Utc::now().timestamp_millis(); + *last_telemetry_timestamp = timestamp; + + let node = NodeTelemetrySnapshot { + id: node_id, + name: field.node_name.clone(), + telemetry: vec![field], + }; + send_message( + stream, + "telemetry_delta", + TelemetryDeltaContent { + prev_timestamp: previous_timestamp, + timestamp, + nodes: telemetry_nodes_from_snapshots(vec![node]), + }, + ) + } + } + } + _ => Ok(()), + } +} + +fn handle_command( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + message: &[u8], + last_telemetry_timestamp: &mut i64, +) -> Result<()> { + let incoming: IncomingMessage = serde_json::from_slice(message) + .with_context(|| "failed to decode webserver command as JSON")?; + + match incoming.message_type.as_str() { + "set_parameter" => { + let command: SetParameterCommand = serde_json::from_value(incoming.content) + .with_context(|| "invalid set_parameter command content")?; + match command.field { + FieldReferenceCommand::Mapped { name } => { + let value = command + .value + .as_f64() + .with_context(|| format!("mapped parameter {name} requires a number"))?; + node_manager.set_mapped_value(&name, value) + } + FieldReferenceCommand::Raw { + node_name: _, + field_name, + } => node_manager.set_raw_value(&field_name, command.value), + } + } + "get_field" => { + let command: GetFieldCommand = serde_json::from_value(incoming.content) + .with_context(|| "invalid get_field command content")?; + request_field(node_manager, &command.field)?; + + Ok(()) + } + "get_nodes" => send_nodes_message(stream, node_manager), + "get_telemetry" => { + let timestamp = Utc::now().timestamp_millis(); + send_telemetry_message(stream, node_manager, timestamp)?; + *last_telemetry_timestamp = timestamp; + Ok(()) + } + other => bail!("unsupported command type {other}"), + } +} + +fn send_nodes_message(stream: &mut TcpStream, node_manager: &NodeManager<'_>) -> Result<()> { + let nodes = node_manager + .nodes_snapshot() + .into_iter() + .map(|node| NodeContent { + id: node.id, + name: node.name, + fields: node + .fields + .into_iter() + .map(|field| FieldContent { + id: field.id, + name: field.name, + raw_name: field.raw_name, + mapped_name: field.mapped_name, + kind: match field.kind { + FieldType::Telemetry => "telemetry", + FieldType::Parameter => "parameter", + }, + }) + .collect(), + }) + .collect(); + send_message(stream, "nodes", NodesContent { nodes }) +} + +fn request_field( + node_manager: &NodeManager<'_>, + field_reference: &FieldReferenceCommand, +) -> Result<()> { + match field_reference { + FieldReferenceCommand::Mapped { name } => node_manager.request_value(name), + FieldReferenceCommand::Raw { + node_name, + field_name: _, + } => node_manager.request_value(node_name), + } +} + +fn send_telemetry_message( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + timestamp: i64, +) -> Result<()> { + send_message( + stream, + "telemetry", + TelemetryContent { + timestamp, + nodes: telemetry_nodes_from_snapshots(node_manager.telemetry_snapshot()), + }, + ) +} + +fn telemetry_nodes_from_snapshots( + snapshots: Vec, +) -> Vec { + snapshots + .into_iter() + .map(|node| TelemetryNodeContent { + id: node.id, + name: node.name, + telemetry: node + .telemetry + .into_iter() + .map(TelemetryFieldContent::from) + .collect(), + }) + .collect() +} + +fn send_message( + stream: &mut TcpStream, + message_type: &'static str, + content: T, +) -> Result<()> { + let payload = serde_json::to_vec(&OutgoingMessage { + message_type, + content, + })?; + let len = u16::try_from(payload.len()) + .with_context(|| format!("socket message too large: {} bytes", payload.len()))?; + + stream.write_all(&len.to_be_bytes())?; + stream.write_all(&payload)?; + Ok(()) +} + +/// Attempts to read a single message from the socket. Returns true if a message was placed in the message buffer, false otherwise. +fn read_message(stream: &mut TcpStream, message_buffer: &mut Vec) -> Result> { + // Do we already have a full message in the buffer from a previous read? + if let Some(msg_len) = buffer_contains_full_message(message_buffer) { + return Ok(Some(msg_len)); + } + + // Read more data + let mut scratch = [0_u8; 4096]; + + loop { + match stream.read(&mut scratch) { + Ok(0) => break, // EOF, socket closed + Ok(bytes_read) => message_buffer.extend_from_slice(&scratch[..bytes_read]), + Err(error) if error.kind() == ErrorKind::Interrupted => continue, + Err(error) if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => { + break; + } + Err(error) => return Err(error.into()), + } + } + + // Do we have a full message now? + if let Some(msg_len) = buffer_contains_full_message(message_buffer) { + Ok(Some(msg_len)) + } else { + Ok(None) + } +} + +fn message_len_from_buffer(buffer: &[u8]) -> Option { + if buffer.len() < 2 { + None + } else { + Some(u16::from_be_bytes([buffer[0], buffer[1]]) as usize) + } +} + +fn buffer_contains_full_message(buffer: &[u8]) -> Option { + let len = message_len_from_buffer(buffer); + if let Some(len) = len + && buffer.len() >= len + 2 + { + Some(len) + } else { + None + } +} + +fn resolve_socket_addr(config: &WebserverSocketConfig) -> Result { + (config.host.as_str(), config.port) + .to_socket_addrs()? + .next() + .with_context(|| format!("{}:{} resolved to no addresses", config.host, config.port)) +} + +#[derive(Deserialize)] +struct IncomingMessage { + #[serde(rename = "type")] + message_type: String, + content: serde_json::Value, +} + +#[derive(Deserialize)] +struct SetParameterCommand { + field: FieldReferenceCommand, + value: serde_json::Value, +} + +#[derive(Deserialize)] +struct GetFieldCommand { + field: FieldReferenceCommand, +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +enum FieldReferenceCommand { + Mapped { + name: String, + }, + Raw { + node_name: String, + field_name: String, + }, +} + +#[derive(Serialize)] +struct OutgoingMessage { + #[serde(rename = "type")] + message_type: &'static str, + content: T, +} + +#[derive(Serialize)] +struct TelemetryContent { + timestamp: i64, + nodes: Vec, +} + +#[derive(Serialize)] +struct TelemetryDeltaContent { + prev_timestamp: i64, + timestamp: i64, + nodes: Vec, +} + +#[derive(Serialize)] +struct TelemetryNodeContent { + id: u8, + name: String, + telemetry: Vec, +} + +#[derive(Serialize)] +struct TelemetryFieldContent { + id: u8, + name: String, + raw_name: String, + mapped_name: Option, + raw: serde_json::Value, + value: serde_json::Value, + unit: String, + logical: serde_json::Value, +} + +impl From for TelemetryFieldContent { + fn from(value: FieldValueSnapshot) -> Self { + Self { + id: value.id, + name: value.name, + raw_name: value.raw_name, + mapped_name: value.mapped_name, + raw: value.raw, + value: value.value, + unit: value.unit, + logical: value.logical, + } + } +} + +#[derive(Serialize)] +struct NodesContent { + nodes: Vec, +} + +#[derive(Serialize)] +struct NodeContent { + id: u8, + name: String, + fields: Vec, +} + +#[derive(Serialize)] +struct FieldContent { + id: u8, + name: String, + raw_name: String, + mapped_name: Option, + #[serde(rename = "type")] + kind: &'static str, +} + +#[derive(Serialize)] +struct FieldGetResponseContent { + node_id: u8, + node_name: String, + raw_name: String, + mapped_name: Option, + name: String, + raw: serde_json::Value, + value: serde_json::Value, + unit: String, + logical: serde_json::Value, +} + +impl FieldGetResponseContent { + fn from_field(node_id: u8, field: FieldValueSnapshot) -> Self { + Self { + node_id, + node_name: field.node_name, + raw_name: field.raw_name, + mapped_name: field.mapped_name, + name: field.name, + raw: field.raw, + value: field.value, + unit: field.unit, + logical: field.logical, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_mapped_field_command_reference() { + let command: GetFieldCommand = serde_json::from_value(serde_json::json!({ + "field": { + "type": "mapped", + "name": "tank_pressure" + } + })) + .expect("mapped command should parse"); + + assert_eq!( + command.field, + FieldReferenceCommand::Mapped { + name: "tank_pressure".to_string() + } + ); + } + + #[test] + fn parses_raw_field_command_reference() { + let command: SetParameterCommand = serde_json::from_value(serde_json::json!({ + "field": { + "type": "raw", + "node_name": "ECU", + "field_name": "valve_raw" + }, + "value": 42 + })) + .expect("raw command should parse"); + + assert_eq!( + command.field, + FieldReferenceCommand::Raw { + node_name: "ECU".to_string(), + field_name: "valve_raw".to_string() + } + ); + assert_eq!(command.value, serde_json::json!(42)); + } +} diff --git a/tests/db_logging.rs b/tests/db_logging.rs index a91be77..ed8e515 100644 --- a/tests/db_logging.rs +++ b/tests/db_logging.rs @@ -28,13 +28,16 @@ fn logging_worker_persists_events_to_timescaledb() { db::spawn_logging_worker(database_url.clone(), &event_dispatcher, scope) .expect("logging worker should start"); - event_dispatcher.dispatch(events::Event::NodeFieldUpdated(db::FieldLog { - timestamp: Utc::now(), - node_id: 3, - field_id: 99, - field_name: "tank_pressure".into(), - field_value: json!(17.4), - })); + event_dispatcher.dispatch(events::Event::NodeFieldUpdated( + db::FieldLog { + timestamp: Utc::now(), + node_id: 3, + field_id: 99, + field_name: "tank_pressure".into(), + field_value: json!(17.4), + }, + events::NodeFieldUpdateSource::TelemetryUpdate, + )); event_dispatcher.dispatch(events::Event::Shutdown); }); diff --git a/tests/emulator.rs b/tests/emulator.rs index 0623760..bde8108 100644 --- a/tests/emulator.rs +++ b/tests/emulator.rs @@ -2,7 +2,7 @@ mod common; use crate::common::ShutdownGuard; use chrono::{DateTime, Utc}; -use ferro_flow::config::Config; +use ferro_flow::config::{Config, WebserverSocketConfig}; use ferro_flow::nodes::mapping::Mapping; use ferro_flow::{events, nodes, run_with_dependencies}; use liquidcan::payloads::CanDataType; @@ -187,6 +187,7 @@ fn build_test_config(can_iface: &str) -> Config { heartbeat_period: 1, database_url: "".to_string(), mapping_path: "".to_string(), + webserver_socket: WebserverSocketConfig::default(), } }