Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions sentry-core/src/logs.rs → sentry-core/src/batcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Batching for Sentry [structured logs](https://docs.sentry.io/product/explore/logs/).
//! Generic batching for Sentry envelope items.

use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread::JoinHandle;
Expand All @@ -9,37 +9,61 @@ use crate::protocol::EnvelopeItem;
use crate::Envelope;
use sentry_types::protocol::v7::Log;

// Flush when there's 100 logs in the buffer
const MAX_LOG_ITEMS: usize = 100;
// Flush when there's 100 items in the buffer
const MAX_ITEMS: usize = 100;
// Or when 5 seconds have passed from the last flush
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);

#[derive(Debug, Default)]
struct LogQueue {
logs: Vec<Log>,
#[derive(Debug)]
struct BatchQueue<T> {
items: Vec<T>,
}

/// Accumulates logs in the queue and submits them through the transport when one of the flushing
pub(crate) trait IntoBatchEnvelopeItem: Sized {
fn into_envelope_item(items: Vec<Self>) -> EnvelopeItem;
}

impl<T> IntoBatchEnvelopeItem for T
where
Vec<T>: Into<EnvelopeItem>,
{
fn into_envelope_item(items: Vec<Self>) -> EnvelopeItem {
items.into()
}
}

pub(crate) trait Batch: IntoBatchEnvelopeItem {
const TYPE_NAME: &str;
}

impl Batch for Log {
const TYPE_NAME: &str = "logs";
}

/// Accumulates items in the queue and submits them through the transport when one of the flushing
/// conditions is met.
pub(crate) struct LogsBatcher {
pub(crate) struct Batcher<T: Batch> {
transport: TransportArc,
queue: Arc<Mutex<LogQueue>>,
queue: Arc<Mutex<BatchQueue<T>>>,
shutdown: Arc<(Mutex<bool>, Condvar)>,
worker: Option<JoinHandle<()>>,
}

impl LogsBatcher {
/// Creates a new LogsBatcher that will submit envelopes to the given `transport`.
impl<T> Batcher<T>
where
T: Batch + Send + 'static,
{
/// Creates a new Batcher that will submit envelopes to the given `transport`.
pub(crate) fn new(transport: TransportArc) -> Self {
let queue = Arc::new(Mutex::new(Default::default()));
let queue = Arc::new(Mutex::new(BatchQueue { items: Vec::new() }));
#[allow(clippy::mutex_atomic)]
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));

let worker_transport = transport.clone();
let worker_queue = queue.clone();
let worker_shutdown = shutdown.clone();
let worker = std::thread::Builder::new()
.name("sentry-logs-batcher".into())
.name(format!("sentry-{}-batcher", T::TYPE_NAME))
.spawn(move || {
let (lock, cvar) = worker_shutdown.as_ref();
let mut shutdown = lock.lock().unwrap();
Expand All @@ -57,7 +81,7 @@ impl LogsBatcher {
return;
}
if last_flush.elapsed() >= FLUSH_INTERVAL {
LogsBatcher::flush_queue_internal(
Batcher::flush_queue_internal(
worker_queue.lock().unwrap(),
&worker_transport,
);
Expand All @@ -74,48 +98,50 @@ impl LogsBatcher {
worker: Some(worker),
}
}
}

/// Enqueues a log for delayed sending.
impl<T: Batch> Batcher<T> {
/// Enqueues an item for delayed sending.
///
/// This will automatically flush the queue if it reaches a size of `BATCH_SIZE`.
pub(crate) fn enqueue(&self, log: Log) {
/// This will automatically flush the queue if it reaches a size of `MAX_ITEMS`.
pub(crate) fn enqueue(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.logs.push(log);
if queue.logs.len() >= MAX_LOG_ITEMS {
LogsBatcher::flush_queue_internal(queue, &self.transport);
queue.items.push(item);
if queue.items.len() >= MAX_ITEMS {
Batcher::flush_queue_internal(queue, &self.transport);
}
}

/// Flushes the queue to the transport.
pub(crate) fn flush(&self) {
let queue = self.queue.lock().unwrap();
LogsBatcher::flush_queue_internal(queue, &self.transport);
Batcher::flush_queue_internal(queue, &self.transport);
}

/// Flushes the queue to the transport.
///
/// This is a static method as it will be called from both the background
/// thread and the main thread on drop.
fn flush_queue_internal(mut queue_lock: MutexGuard<LogQueue>, transport: &TransportArc) {
let logs = std::mem::take(&mut queue_lock.logs);
fn flush_queue_internal(mut queue_lock: MutexGuard<BatchQueue<T>>, transport: &TransportArc) {
let items = std::mem::take(&mut queue_lock.items);
drop(queue_lock);

if logs.is_empty() {
if items.is_empty() {
return;
}

sentry_debug!("[LogsBatcher] Flushing {} logs", logs.len());
sentry_debug!("[Batcher({})] Flushing {} items", T::TYPE_NAME, items.len());

if let Some(ref transport) = *transport.read().unwrap() {
let mut envelope = Envelope::new();
let logs_item: EnvelopeItem = logs.into();
envelope.add_item(logs_item);
let envelope_item = T::into_envelope_item(items);
envelope.add_item(envelope_item);
transport.send_envelope(envelope);
}
}
}

impl Drop for LogsBatcher {
impl<T: Batch> Drop for Batcher<T> {
fn drop(&mut self) {
let (lock, cvar) = self.shutdown.as_ref();
*lock.lock().unwrap() = true;
Expand All @@ -124,7 +150,7 @@ impl Drop for LogsBatcher {
if let Some(worker) = self.worker.take() {
worker.join().ok();
}
LogsBatcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
Batcher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
}
}

Expand Down
10 changes: 5 additions & 5 deletions sentry-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::protocol::SessionUpdate;
use rand::random;
use sentry_types::random_uuid;

use crate::constants::SDK_INFO;
#[cfg(feature = "logs")]
use crate::logs::LogsBatcher;
use crate::batcher::Batcher;
use crate::constants::SDK_INFO;
use crate::protocol::{ClientSdkInfo, Event};
#[cfg(feature = "release-health")]
use crate::session::SessionFlusher;
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct Client {
#[cfg(feature = "release-health")]
session_flusher: RwLock<Option<SessionFlusher>>,
#[cfg(feature = "logs")]
logs_batcher: RwLock<Option<LogsBatcher>>,
logs_batcher: RwLock<Option<Batcher<Log>>>,
#[cfg(feature = "logs")]
default_log_attributes: Option<BTreeMap<String, LogAttribute>>,
integrations: Vec<(TypeId, Arc<dyn Integration>)>,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl Clone for Client {

#[cfg(feature = "logs")]
let logs_batcher = RwLock::new(if self.options.enable_logs {
Some(LogsBatcher::new(transport.clone()))
Some(Batcher::new(transport.clone()))
} else {
None
});
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Client {

#[cfg(feature = "logs")]
let logs_batcher = RwLock::new(if options.enable_logs {
Some(LogsBatcher::new(transport.clone()))
Some(Batcher::new(transport.clone()))
} else {
None
});
Expand Down
4 changes: 2 additions & 2 deletions sentry-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ pub use crate::transport::{Transport, TransportFactory};
mod logger; // structured logging macros exported with `#[macro_export]`

// client feature
#[cfg(all(feature = "client", feature = "logs"))]
mod batcher;
#[cfg(feature = "client")]
mod client;
#[cfg(feature = "client")]
mod hub_impl;
#[cfg(all(feature = "client", feature = "logs"))]
mod logs;
#[cfg(feature = "client")]
mod session;

Expand Down