diff --git a/sentry-core/src/logs.rs b/sentry-core/src/batcher.rs similarity index 71% rename from sentry-core/src/logs.rs rename to sentry-core/src/batcher.rs index 9be3ee335..dfc28bf5e 100644 --- a/sentry-core/src/logs.rs +++ b/sentry-core/src/batcher.rs @@ -1,4 +1,4 @@ -//! Batching for Sentry [structured logs](https://docs.sentry.io/product/explore/logs/). +//! Generic batching for Sentry envelope items. use std::sync::{Arc, Condvar, Mutex, MutexGuard}; use std::thread::JoinHandle; @@ -9,29 +9,53 @@ use crate::protocol::EnvelopeItem; use crate::Envelope; use sentry_types::protocol::v7::Log; -// Flush when there's 100 logs in the buffer -const MAX_LOG_ITEMS: usize = 100; +// Flush when there's 100 items in the buffer +const MAX_ITEMS: usize = 100; // Or when 5 seconds have passed from the last flush const FLUSH_INTERVAL: Duration = Duration::from_secs(5); -#[derive(Debug, Default)] -struct LogQueue { - logs: Vec, +#[derive(Debug)] +struct BatchQueue { + items: Vec, } -/// Accumulates logs in the queue and submits them through the transport when one of the flushing +pub(crate) trait IntoBatchEnvelopeItem: Sized { + fn into_envelope_item(items: Vec) -> EnvelopeItem; +} + +impl IntoBatchEnvelopeItem for T +where + Vec: Into, +{ + fn into_envelope_item(items: Vec) -> EnvelopeItem { + items.into() + } +} + +pub(crate) trait Batch: IntoBatchEnvelopeItem { + const TYPE_NAME: &str; +} + +impl Batch for Log { + const TYPE_NAME: &str = "logs"; +} + +/// Accumulates items in the queue and submits them through the transport when one of the flushing /// conditions is met. -pub(crate) struct LogsBatcher { +pub(crate) struct Batcher { transport: TransportArc, - queue: Arc>, + queue: Arc>>, shutdown: Arc<(Mutex, Condvar)>, worker: Option>, } -impl LogsBatcher { - /// Creates a new LogsBatcher that will submit envelopes to the given `transport`. +impl Batcher +where + T: Batch + Send + 'static, +{ + /// Creates a new Batcher that will submit envelopes to the given `transport`. pub(crate) fn new(transport: TransportArc) -> Self { - let queue = Arc::new(Mutex::new(Default::default())); + let queue = Arc::new(Mutex::new(BatchQueue { items: Vec::new() })); #[allow(clippy::mutex_atomic)] let shutdown = Arc::new((Mutex::new(false), Condvar::new())); @@ -39,7 +63,7 @@ impl LogsBatcher { let worker_queue = queue.clone(); let worker_shutdown = shutdown.clone(); let worker = std::thread::Builder::new() - .name("sentry-logs-batcher".into()) + .name(format!("sentry-{}-batcher", T::TYPE_NAME)) .spawn(move || { let (lock, cvar) = worker_shutdown.as_ref(); let mut shutdown = lock.lock().unwrap(); @@ -57,7 +81,7 @@ impl LogsBatcher { return; } if last_flush.elapsed() >= FLUSH_INTERVAL { - LogsBatcher::flush_queue_internal( + Batcher::flush_queue_internal( worker_queue.lock().unwrap(), &worker_transport, ); @@ -74,48 +98,50 @@ impl LogsBatcher { worker: Some(worker), } } +} - /// Enqueues a log for delayed sending. +impl Batcher { + /// Enqueues an item for delayed sending. /// - /// This will automatically flush the queue if it reaches a size of `BATCH_SIZE`. - pub(crate) fn enqueue(&self, log: Log) { + /// This will automatically flush the queue if it reaches a size of `MAX_ITEMS`. + pub(crate) fn enqueue(&self, item: T) { let mut queue = self.queue.lock().unwrap(); - queue.logs.push(log); - if queue.logs.len() >= MAX_LOG_ITEMS { - LogsBatcher::flush_queue_internal(queue, &self.transport); + queue.items.push(item); + if queue.items.len() >= MAX_ITEMS { + Batcher::flush_queue_internal(queue, &self.transport); } } /// Flushes the queue to the transport. pub(crate) fn flush(&self) { let queue = self.queue.lock().unwrap(); - LogsBatcher::flush_queue_internal(queue, &self.transport); + Batcher::flush_queue_internal(queue, &self.transport); } /// Flushes the queue to the transport. /// /// This is a static method as it will be called from both the background /// thread and the main thread on drop. - fn flush_queue_internal(mut queue_lock: MutexGuard, transport: &TransportArc) { - let logs = std::mem::take(&mut queue_lock.logs); + fn flush_queue_internal(mut queue_lock: MutexGuard>, transport: &TransportArc) { + let items = std::mem::take(&mut queue_lock.items); drop(queue_lock); - if logs.is_empty() { + if items.is_empty() { return; } - sentry_debug!("[LogsBatcher] Flushing {} logs", logs.len()); + sentry_debug!("[Batcher({})] Flushing {} items", T::TYPE_NAME, items.len()); if let Some(ref transport) = *transport.read().unwrap() { let mut envelope = Envelope::new(); - let logs_item: EnvelopeItem = logs.into(); - envelope.add_item(logs_item); + let envelope_item = T::into_envelope_item(items); + envelope.add_item(envelope_item); transport.send_envelope(envelope); } } } -impl Drop for LogsBatcher { +impl Drop for Batcher { fn drop(&mut self) { let (lock, cvar) = self.shutdown.as_ref(); *lock.lock().unwrap() = true; @@ -124,7 +150,7 @@ impl Drop for LogsBatcher { if let Some(worker) = self.worker.take() { worker.join().ok(); } - LogsBatcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); + Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport); } } diff --git a/sentry-core/src/client.rs b/sentry-core/src/client.rs index a4b72d080..003d3edc5 100644 --- a/sentry-core/src/client.rs +++ b/sentry-core/src/client.rs @@ -12,9 +12,9 @@ use crate::protocol::SessionUpdate; use rand::random; use sentry_types::random_uuid; -use crate::constants::SDK_INFO; #[cfg(feature = "logs")] -use crate::logs::LogsBatcher; +use crate::batcher::Batcher; +use crate::constants::SDK_INFO; use crate::protocol::{ClientSdkInfo, Event}; #[cfg(feature = "release-health")] use crate::session::SessionFlusher; @@ -58,7 +58,7 @@ pub struct Client { #[cfg(feature = "release-health")] session_flusher: RwLock>, #[cfg(feature = "logs")] - logs_batcher: RwLock>, + logs_batcher: RwLock>>, #[cfg(feature = "logs")] default_log_attributes: Option>, integrations: Vec<(TypeId, Arc)>, @@ -86,7 +86,7 @@ impl Clone for Client { #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if self.options.enable_logs { - Some(LogsBatcher::new(transport.clone())) + Some(Batcher::new(transport.clone())) } else { None }); @@ -171,7 +171,7 @@ impl Client { #[cfg(feature = "logs")] let logs_batcher = RwLock::new(if options.enable_logs { - Some(LogsBatcher::new(transport.clone())) + Some(Batcher::new(transport.clone())) } else { None }); diff --git a/sentry-core/src/lib.rs b/sentry-core/src/lib.rs index a39e63ca5..6a2a1b5dd 100644 --- a/sentry-core/src/lib.rs +++ b/sentry-core/src/lib.rs @@ -136,12 +136,12 @@ pub use crate::transport::{Transport, TransportFactory}; mod logger; // structured logging macros exported with `#[macro_export]` // client feature +#[cfg(all(feature = "client", feature = "logs"))] +mod batcher; #[cfg(feature = "client")] mod client; #[cfg(feature = "client")] mod hub_impl; -#[cfg(all(feature = "client", feature = "logs"))] -mod logs; #[cfg(feature = "client")] mod session;