-
Notifications
You must be signed in to change notification settings - Fork 22
feat: add streaming ATOF exporter #172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,8 +10,11 @@ | |
|
|
||
| use std::fs::{File, OpenOptions}; | ||
| use std::io::{BufWriter, Write}; | ||
| use std::net::{Shutdown, TcpStream}; | ||
| use std::path::{Path, PathBuf}; | ||
| use std::sync::{Arc, Mutex}; | ||
| use std::sync::{Arc, Mutex, mpsc}; | ||
| use std::thread::JoinHandle; | ||
| use std::time::Duration; | ||
|
|
||
| use chrono::Utc; | ||
|
|
||
|
|
@@ -45,6 +48,28 @@ pub enum AtofExporterError { | |
| /// Underlying I/O error. | ||
| source: std::io::Error, | ||
| }, | ||
| /// Failed to connect to an ATOF stream receiver. | ||
| #[error("failed to connect to ATOF stream receiver {address}: {source}")] | ||
| ConnectStream { | ||
| /// Address that failed to connect. | ||
| address: String, | ||
| /// Underlying I/O error. | ||
| source: std::io::Error, | ||
| }, | ||
| /// Failed to configure the ATOF stream connection. | ||
| #[error( | ||
| "failed to configure ATOF stream receiver {address} with {operation} (ATOF_STREAM_WRITE_TIMEOUT={timeout:?}): {source}" | ||
| )] | ||
| ConfigureStream { | ||
| /// Address associated with the stream. | ||
| address: String, | ||
| /// Stream option that failed. | ||
| operation: &'static str, | ||
| /// Write timeout used when configuring the stream. | ||
| timeout: Option<Duration>, | ||
| /// Underlying I/O error. | ||
| source: std::io::Error, | ||
| }, | ||
| /// The exporter recorded an earlier write or serialization error. | ||
| #[error("previous ATOF export failed for {path:?}: {message}")] | ||
| StoredFailure { | ||
|
|
@@ -53,6 +78,14 @@ pub enum AtofExporterError { | |
| /// Stored failure message. | ||
| message: String, | ||
| }, | ||
| /// The streaming exporter recorded an earlier write or serialization error. | ||
| #[error("previous ATOF stream export failed for {address}: {message}")] | ||
| StoredStreamFailure { | ||
| /// Address associated with the stream. | ||
| address: String, | ||
| /// Stored failure message. | ||
| message: String, | ||
| }, | ||
| /// The internal exporter state lock was poisoned. | ||
| #[error("the ATOF exporter state lock was poisoned")] | ||
| LockPoisoned, | ||
|
|
@@ -225,6 +258,302 @@ impl AtofExporter { | |
| } | ||
| } | ||
|
|
||
| /// Configuration for [`AtofStreamingExporter`]. | ||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| pub struct AtofStreamingExporterConfig { | ||
| /// TCP address for a separate local process that receives ATOF JSONL events. | ||
| pub address: String, | ||
| } | ||
|
|
||
| impl AtofStreamingExporterConfig { | ||
| /// Create a streaming exporter config for the given TCP address. | ||
| pub fn new(address: impl Into<String>) -> Self { | ||
| Self { | ||
| address: address.into(), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| const ATOF_STREAM_QUEUE_BOUND: usize = 1024; | ||
| const ATOF_STREAM_WRITE_TIMEOUT: Duration = Duration::from_secs(2); | ||
|
|
||
| enum AtofStreamMessage { | ||
| Event(String), | ||
| Flush(mpsc::Sender<std::result::Result<(), String>>), | ||
| Shutdown(mpsc::Sender<std::result::Result<(), String>>), | ||
| } | ||
|
|
||
| struct AtofStreamingExporterState { | ||
| sender: Option<mpsc::SyncSender<AtofStreamMessage>>, | ||
| writer_thread: Option<JoinHandle<()>>, | ||
| events_sent: u64, | ||
| last_error: Arc<Mutex<Option<String>>>, | ||
| } | ||
|
|
||
| /// Snapshot of [`AtofStreamingExporter`] delivery state. | ||
| #[derive(Debug, Clone, Default, PartialEq, Eq)] | ||
| pub struct AtofStreamingExporterStats { | ||
| /// Number of ATOF events observed by the streaming exporter. | ||
| pub events_sent: u64, | ||
| /// Most recent serialization or exporter state error, if one was recorded. | ||
| pub last_error: Option<String>, | ||
| } | ||
|
|
||
| /// TCP-backed Agent Trajectory Observability Format (ATOF) event stream exporter. | ||
| /// | ||
| /// The exporter exposes a regular NeMo Relay event subscriber and writes each | ||
| /// canonical ATOF JSON value as one JSONL line to a separate local process over | ||
| /// a TCP connection. A local UI, CLI, or bridge process can own the receiving | ||
| /// socket and fan events out over HTTP, SSE, WebSocket, stdout, or another | ||
| /// transport without redefining the ATOF event contract. | ||
| #[derive(Clone)] | ||
| pub struct AtofStreamingExporter { | ||
| address: String, | ||
| state: Arc<Mutex<AtofStreamingExporterState>>, | ||
| } | ||
|
|
||
| impl AtofStreamingExporter { | ||
| /// Connect to a separate local ATOF stream receiver. | ||
| pub fn new(config: AtofStreamingExporterConfig) -> Result<Self> { | ||
| let address = config.address; | ||
| let stream = | ||
| TcpStream::connect(&address).map_err(|source| AtofExporterError::ConnectStream { | ||
| address: address.clone(), | ||
| source, | ||
| })?; | ||
| stream | ||
| .set_nodelay(true) | ||
| .map_err(|source| AtofExporterError::ConfigureStream { | ||
| address: address.clone(), | ||
| operation: "set_nodelay", | ||
| timeout: None, | ||
| source, | ||
| })?; | ||
| stream | ||
| .set_write_timeout(Some(ATOF_STREAM_WRITE_TIMEOUT)) | ||
| .map_err(|source| AtofExporterError::ConfigureStream { | ||
| address: address.clone(), | ||
| operation: "set_write_timeout", | ||
| timeout: Some(ATOF_STREAM_WRITE_TIMEOUT), | ||
| source, | ||
| })?; | ||
| let (sender, receiver) = mpsc::sync_channel(ATOF_STREAM_QUEUE_BOUND); | ||
| let last_error = Arc::new(Mutex::new(None)); | ||
| let writer_error = Arc::clone(&last_error); | ||
| let writer_thread = std::thread::spawn(move || { | ||
| let mut writer = BufWriter::new(stream); | ||
| while let Ok(message) = receiver.recv() { | ||
| match message { | ||
| AtofStreamMessage::Event(value) => { | ||
| if let Err(error) = write_serialized_event(&mut writer, &value) { | ||
| store_stream_error(&writer_error, error); | ||
| } | ||
| } | ||
| AtofStreamMessage::Flush(reply) => { | ||
| let result = writer.flush().map_err(|error| error.to_string()); | ||
| if let Err(error) = &result { | ||
| store_stream_error(&writer_error, error.clone()); | ||
| } | ||
| let _ = reply.send(result); | ||
| } | ||
| AtofStreamMessage::Shutdown(reply) => { | ||
| let result = writer.flush().map_err(|error| error.to_string()); | ||
| if let Err(error) = &result { | ||
| store_stream_error(&writer_error, error.clone()); | ||
| } | ||
| let _ = writer.get_ref().shutdown(Shutdown::Both); | ||
| let _ = reply.send(result); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| }); | ||
| Ok(Self { | ||
| address, | ||
| state: Arc::new(Mutex::new(AtofStreamingExporterState { | ||
| sender: Some(sender), | ||
| writer_thread: Some(writer_thread), | ||
| events_sent: 0, | ||
| last_error, | ||
| })), | ||
| }) | ||
| } | ||
|
|
||
| /// Connect to a separate local ATOF stream receiver at the given TCP address. | ||
| pub fn connect(address: impl Into<String>) -> Result<Self> { | ||
| Self::new(AtofStreamingExporterConfig::new(address)) | ||
| } | ||
|
|
||
| /// Return the connected stream receiver address. | ||
| pub fn address(&self) -> &str { | ||
| &self.address | ||
| } | ||
|
|
||
| /// Return an event subscriber that writes one canonical JSONL record per event. | ||
| pub fn subscriber(&self) -> EventSubscriberFn { | ||
| let state = Arc::clone(&self.state); | ||
| Arc::new(move |event: &Event| { | ||
| let value = match serialize_event(event) { | ||
| Ok(value) => value, | ||
| Err(error) => { | ||
| if let Ok(state) = state.lock() { | ||
| store_stream_error(&state.last_error, error); | ||
| } | ||
| return; | ||
| } | ||
| }; | ||
| let Ok(mut state) = state.lock() else { | ||
| return; | ||
| }; | ||
| if stream_last_error(&state.last_error).is_some() { | ||
| return; | ||
| } | ||
| let Some(sender) = state.sender.as_ref() else { | ||
| store_stream_error(&state.last_error, "stream receiver is closed".to_string()); | ||
| return; | ||
| }; | ||
| match sender.try_send(AtofStreamMessage::Event(value)) { | ||
| Ok(()) => { | ||
| state.events_sent += 1; | ||
| } | ||
| Err(mpsc::TrySendError::Full(_)) => { | ||
| store_stream_error(&state.last_error, "ATOF stream queue is full".to_string()); | ||
| } | ||
| Err(mpsc::TrySendError::Disconnected(_)) => { | ||
| store_stream_error( | ||
| &state.last_error, | ||
| "ATOF stream writer is disconnected".to_string(), | ||
| ); | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| /// Register this streaming exporter globally under the given subscriber name. | ||
| pub fn register(&self, name: &str) -> Result<()> { | ||
| register_subscriber(name, self.subscriber()).map_err(Into::into) | ||
| } | ||
|
|
||
| /// Deregister a global subscriber by name. | ||
| pub fn deregister(&self, name: &str) -> Result<bool> { | ||
| deregister_subscriber(name).map_err(Into::into) | ||
| } | ||
|
|
||
| /// Flush the stream and report any stored write error. | ||
| pub fn force_flush(&self) -> Result<()> { | ||
| let (sender, last_error) = { | ||
| let state = self | ||
| .state | ||
| .lock() | ||
| .map_err(|_| AtofExporterError::LockPoisoned)?; | ||
| if let Some(message) = stream_last_error(&state.last_error) { | ||
| return Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message, | ||
| }); | ||
| } | ||
| (state.sender.clone(), Arc::clone(&state.last_error)) | ||
| }; | ||
| let Some(sender) = sender else { | ||
| return Ok(()); | ||
| }; | ||
| let (reply_sender, reply_receiver) = mpsc::channel(); | ||
| if sender.send(AtofStreamMessage::Flush(reply_sender)).is_err() { | ||
| return Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message: "ATOF stream writer is disconnected".to_string(), | ||
| }); | ||
| } | ||
| match reply_receiver.recv() { | ||
| Ok(Ok(())) => { | ||
| if let Some(message) = stream_last_error(&last_error) { | ||
| return Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message, | ||
| }); | ||
| } | ||
| Ok(()) | ||
| } | ||
| Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message, | ||
| }), | ||
| Err(error) => Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message: error.to_string(), | ||
| }), | ||
| } | ||
| } | ||
|
|
||
| /// Shut down the stream by flushing and closing the TCP connection. | ||
| pub fn shutdown(&self) -> Result<()> { | ||
| let flush_result = self.force_flush(); | ||
| let (sender, writer_thread, last_error) = { | ||
| let mut state = self | ||
| .state | ||
| .lock() | ||
| .map_err(|_| AtofExporterError::LockPoisoned)?; | ||
| ( | ||
| state.sender.take(), | ||
| state.writer_thread.take(), | ||
| Arc::clone(&state.last_error), | ||
| ) | ||
| }; | ||
| let shutdown_result = if let Some(sender) = sender { | ||
| let (reply_sender, reply_receiver) = mpsc::channel(); | ||
| let send_result = sender | ||
| .send(AtofStreamMessage::Shutdown(reply_sender)) | ||
| .map_err(|_| AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message: "ATOF stream writer is disconnected".to_string(), | ||
| }); | ||
| match send_result { | ||
| Ok(()) => match reply_receiver.recv() { | ||
| Ok(Ok(())) => Ok(()), | ||
| Ok(Err(message)) => Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message, | ||
| }), | ||
| Err(error) => Err(AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message: error.to_string(), | ||
| }), | ||
| }, | ||
| Err(error) => Err(error), | ||
| } | ||
| } else { | ||
| Ok(()) | ||
| }; | ||
| if let Some(writer_thread) = writer_thread { | ||
| let _ = writer_thread.join(); | ||
| } | ||
| let stored_result = | ||
| stream_last_error(&last_error).map(|message| AtofExporterError::StoredStreamFailure { | ||
| address: self.address.clone(), | ||
| message, | ||
| }); | ||
| match (flush_result, shutdown_result) { | ||
| (Err(error), _) => Err(error), | ||
| (Ok(()), Err(error)) => Err(error), | ||
| (Ok(()), Ok(())) => stored_result.map_or(Ok(()), Err), | ||
| } | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| /// Return a point-in-time delivery snapshot for diagnostics and tests. | ||
| pub fn stats(&self) -> AtofStreamingExporterStats { | ||
| let Ok(state) = self.state.lock() else { | ||
| return AtofStreamingExporterStats { | ||
| last_error: Some("the ATOF streaming exporter state lock was poisoned".to_string()), | ||
| ..AtofStreamingExporterStats::default() | ||
| }; | ||
| }; | ||
| AtofStreamingExporterStats { | ||
| events_sent: state.events_sent, | ||
| last_error: stream_last_error(&state.last_error), | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+261
to
+555
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Run the required Rust/core validation matrix before merge. The recorded verification only mentions As per coding guidelines "Any Rust change must run 🤖 Prompt for AI Agents |
||
|
|
||
| fn default_filename() -> String { | ||
| format!( | ||
| "nemo-relay-events-{}.jsonl", | ||
|
|
@@ -251,15 +580,35 @@ fn open_file(path: &Path, mode: AtofExporterMode) -> Result<File> { | |
| }) | ||
| } | ||
|
|
||
| fn write_event(writer: &mut BufWriter<File>, event: &Event) -> std::result::Result<(), String> { | ||
| fn write_event(writer: &mut impl Write, event: &Event) -> std::result::Result<(), String> { | ||
| write_serialized_event(writer, &serialize_event(event)?) | ||
| } | ||
|
|
||
| fn serialize_event(event: &Event) -> std::result::Result<String, String> { | ||
| let value = event | ||
| .try_to_json_value() | ||
| .map_err(|error| error.to_string())?; | ||
| serde_json::to_writer(&mut *writer, &value).map_err(|error| error.to_string())?; | ||
| serde_json::to_string(&value).map_err(|error| error.to_string()) | ||
| } | ||
|
|
||
| fn write_serialized_event(writer: &mut impl Write, value: &str) -> std::result::Result<(), String> { | ||
| writer | ||
| .write_all(value.as_bytes()) | ||
| .map_err(|error| error.to_string())?; | ||
| writer.write_all(b"\n").map_err(|error| error.to_string())?; | ||
| writer.flush().map_err(|error| error.to_string()) | ||
| } | ||
|
|
||
| fn store_stream_error(last_error: &Arc<Mutex<Option<String>>>, error: String) { | ||
| if let Ok(mut last_error) = last_error.lock() { | ||
| last_error.get_or_insert(error); | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| fn stream_last_error(last_error: &Arc<Mutex<Option<String>>>) -> Option<String> { | ||
| last_error.lock().ok().and_then(|error| error.clone()) | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Tests | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.