Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
397 changes: 397 additions & 0 deletions libdd-trace-utils/src/change_buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,397 @@
use std::collections::HashMap;

use anyhow::{anyhow, bail, Result};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personal opinion about anyhow. I think it's good for either prototyping things or using it in standalone binaries. On the other hand using it in libraries has its downsides, the main one being bubbling errors up to the caller, hence difficulting the SDKs consuming our libraries to do proper error handling. That said, I think that is not a problem right now and we can talk about it in subsequent PRs.

Copy link
Copy Markdown
Contributor

@ekump ekump Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: +1. I don't think anyhow should be used in the public API for a library. But I'm ok with a TODO comment and taking care of it in the future.


mod utils;
use utils::*;

mod trace;
pub use trace::*;

use crate::span::{Span, SpanText};

#[derive(Clone, Copy)]
pub struct ChangeBuffer {
ptr: *mut u8,
len: usize,
}

impl ChangeBuffer {
pub unsafe fn from_raw_parts(ptr: *const u8, len: usize) -> Self {
Self { ptr: ptr as *mut u8, len }
}

fn as_slice(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}

fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
}

pub fn read<T: Copy + FromBytes>(&self, index: &mut usize) -> Result<T> {
let size = std::mem::size_of::<T>();
let slice = self.as_slice();
let bytes = slice.get(*index..*index + size)
.ok_or(anyhow!("read_u64 out of bounds: offset={}, len={}", *index, self.len))?;
let array: [u8; 8] = bytes.try_into()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is generic over T. What happens if T isn't 8 bytes, like u32 or u128?

Do you need to convert bytes into array here? I don't think we lose any error fidelity because I don't think it's possible for try_into to fail here? We have the size, and slice.get() uses size to get exactly the correct bytes.

You could just pass bytes directly with: Ok(T::from_bytes(bytes)).

.map_err(|_| anyhow!("failed to convert slice to [u8; 8]"))?;
*index += size;
Ok(T::from_bytes(&array))
}

pub fn write_u64(&mut self, offset: usize, value: u64) -> Result<()> {
let len = self.len;
let slice = self.as_mut_slice();
let target = slice.get_mut(offset..offset + 8)
.ok_or(anyhow!("write_u64 out of bounds: offset={}, len={}", offset, len))?;
target.copy_from_slice(&value.to_le_bytes());
Ok(())
}

pub fn clear_count(&mut self) -> Result<()> {
self.write_u64(0, 0)
.map_err(|_| anyhow!("failed to clear buffer count"))
}
}

pub struct ChangeBufferState<T: SpanText + Clone> {
Comment thread
hoolioh marked this conversation as resolved.
change_buffer: ChangeBuffer,
spans: HashMap<u64, Span<T>>,
traces: HashMap<u128, Trace<T>>,
string_table: HashMap<u32, T>,
trace_span_counts: HashMap<u128, usize>,
tracer_service: T,
tracer_language: T,
pid: f64,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't PIDs integers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in trace exporter's config 🤷

Copy link
Copy Markdown
Contributor

@ekump ekump Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what you're referring to? Because we send it as a metric, which has to be f64?

Would it be better to do the conversion to f64 internally and not expose that implementation detail in the public API? Going from u32 to f64 should be cheap in Rust.

This is non-blocking. It's not a hill I need to die on.

}

fn new_span<T: SpanText>(span_id: u64, parent_id: u64, trace_id: u128) -> Span<T> {
Span {
span_id,
trace_id,
parent_id,
..Default::default()
}
}

impl<T: SpanText + Clone> ChangeBufferState<T> {
pub fn new(
change_buffer: ChangeBuffer,
tracer_service: T,
tracer_language: T,
pid: f64,
) -> Self {
ChangeBufferState {
change_buffer,
spans: Default::default(),
traces: Default::default(),
string_table: Default::default(),
trace_span_counts: Default::default(),
tracer_service,
tracer_language,
pid,
}
}

pub fn flush_chunk(
&mut self,
span_ids: Vec<u64>,
first_is_local_root: bool,
) -> Result<Vec<Span<T>>> {
let mut chunk_trace_id: Option<u128> = None;
let mut is_local_root = first_is_local_root;
let mut is_chunk_root = true;

let spans_vec = span_ids
.iter()
.map(|span_id| -> Result<Span<T>> {
let maybe_span = self.spans.remove(span_id);

let mut span = maybe_span.ok_or_else(|| anyhow!("span not found: {}", span_id))?;
chunk_trace_id = Some(span.trace_id);

if is_local_root {
self.copy_in_sampling_tags(&mut span);
span.metrics
.insert(T::from_static_str("_dd.top_level"), 1.0);
is_local_root = false;
}
if is_chunk_root {
self.copy_in_chunk_tags(&mut span);
is_chunk_root = false;
}

self.process_one_span(&mut span);

Ok(span)
})
.collect::<Result<Vec<_>>>()?;

// Clean up trace if no spans remain for it
if let Some(trace_id) = chunk_trace_id {
if let Some(count) = self.trace_span_counts.get_mut(&trace_id) {
let len = span_ids.len();
if *count <= len {
// All spans for this trace have been flushed
self.traces.remove(&trace_id);
self.trace_span_counts.remove(&trace_id);
} else {
*count -= len;
}
}
}

Ok(spans_vec)
}

fn copy_in_sampling_tags(&self, span: &mut Span<T>) {
if let Some(trace) = self.traces.get(&span.trace_id) {
if let Some(rule) = trace.sampling_rule_decision {
span.metrics
.insert(T::from_static_str("_dd.rule_psr"), rule);
}
if let Some(rule) = trace.sampling_limit_decision {
span.metrics.insert(T::from_static_str("_dd.limit_psr"), rule);
}
if let Some(rule) = trace.sampling_agent_decision {
span.metrics
.insert(T::from_static_str("_dd.agent_psr"), rule);
}
}
}

fn copy_in_chunk_tags(&self, span: &mut Span<T>) {
if let Some(trace) = self.traces.get(&span.trace_id) {
span.meta.reserve(trace.meta.len());
for (k, v) in &trace.meta {
span.meta.insert(k.clone(), v.clone());
}
span.metrics.reserve(trace.metrics.len());
for (k, v) in &trace.metrics {
span.metrics.insert(k.clone(), *v);
}
}
}

fn process_one_span(&self, span: &mut Span<T>) {
// TODO span.sample();

if let Some(kind) = span.meta.get("kind") {
if kind != &T::from_static_str("internal") {
span.metrics.insert(T::from_static_str("_dd.measured"), 1.0);
}
}

if span.service != self.tracer_service {
span.meta.insert(
T::from_static_str("_dd.base_service"),
self.tracer_service.clone(),
);
// TODO span.service should be added to the "extra services" used by RC, which is not
// yet implemented here
}

// SKIP setting single-span ingestion. They should be set when sampling is finalized for
// the span.

span.meta
.insert(T::from_static_str("language"), self.tracer_language.clone());
span.metrics
.insert(T::from_static_str("process_id"), self.pid);

if let Some(trace) = self.traces.get(&span.trace_id) {
if let Some(origin) = trace.origin.clone() {
span.meta.insert(T::from_static_str("_dd.origin"), origin);
}
}

// SKIP hostname. This can be an option to the span constructor, so we'll set the tag at
// that point.

// TODO Sampling priority, if we're not doing that ahead of time.
}

pub fn flush_change_buffer(&mut self) -> Result<()> {
let mut index = 0;
let mut count = self.change_buffer.read::<u64>(&mut index)?;
index += 8;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misunderstanding, but doesn't the index get incremented in read? Won't this double increment the index?

Assuming I'm setting it up correctly this test would expose the double increment and the size issue in Read.

#[cfg(test)]
mod tests {
    use super::*;
    use libdd_tinybytes::BytesString;

    #[test]
    fn test_flush_change_buffer_single_create() {
        // Buffer: [count=1, opcode=Create(0), span_id, trace_id, parent_id]
        let mut buf = vec![];
        buf.extend_from_slice(&1u64.to_le_bytes()); // count = 1
        buf.extend_from_slice(&0u64.to_le_bytes()); // opcode = Create
        buf.extend_from_slice(&123u64.to_le_bytes()); // span_id
        buf.extend_from_slice(&456u128.to_le_bytes()); // trace_id
        buf.extend_from_slice(&0u64.to_le_bytes()); // parent_id

        let change_buffer =
            unsafe { ChangeBuffer::from_raw_parts(buf.as_ptr(), buf.len()) };
        let mut state = ChangeBufferState::<BytesString>::new(
            change_buffer,
            BytesString::from_static("service"),
            BytesString::from_static("rust"),
            1234.0,
        );

        state.flush_change_buffer().unwrap();

        let span = state.get_span(&123).unwrap();
        assert_eq!(span.span_id, 123);
        assert_eq!(span.trace_id, 456);
    }
}

Also, I just realized the change buffer has no unit tests.


while count > 0 {
let op = BufferedOperation::from_buf(&self.change_buffer, &mut index)?;
self.interpret_operation(&mut index, &op)?;
count -= 1;
}

// Write 0 back to the count position so the writing side of the buffer knows the queue was
// flushed
self.change_buffer.write_u64(0, 0)?;

Ok(())
}

fn get_string_arg(&self, index: &mut usize) -> Result<T> {
let num: u32 = self.get_num_arg(index)?;
self.string_table
.get(&num)
.cloned()
.ok_or_else(|| anyhow!("string not found internally: {}", num))
}

fn get_num_arg<U: Copy + FromBytes>(&self, index: &mut usize) -> Result<U> {
self.change_buffer.read(index)
}

fn get_mut_span(&mut self, id: &u64) -> Result<&mut Span<T>> {
self.spans
.get_mut(id)
.ok_or_else(|| anyhow!("span not found internally: {}", id))
}

pub fn get_span(&self, id: &u64) -> Result<&Span<T>> {
self.spans
.get(id)
.ok_or_else(|| anyhow!("span not found internally: {}", id))
}

pub fn get_trace(&self, id: &u128) -> Option<&Trace<T>> {
self.traces.get(id)
}

fn interpret_operation(&mut self, index: &mut usize, op: &BufferedOperation) -> Result<()> {
match op.opcode {
OpCode::Create => {
let trace_id: u128 = self.change_buffer.read(index)?;
let parent_id = self.get_num_arg(index)?;
let span = new_span(op.span_id, parent_id, trace_id);
self.spans.insert(op.span_id, span);
// Ensure trace exists (creates new one if this is the first span for this trace)
self.traces.entry(trace_id).or_default();

*self.trace_span_counts.entry(trace_id).or_insert(0) += 1;
}
OpCode::SetMetaAttr => {
let name = self.get_string_arg(index)?;
let val = self.get_string_arg(index)?;
let span = self.get_mut_span(&op.span_id)?;
span.meta.insert(name, val);
}
OpCode::SetMetricAttr => {
let name = self.get_string_arg(index)?;
let val: f64 = self.get_num_arg(index)?;
let span = self.get_mut_span(&op.span_id)?;
span.metrics.insert(name, val);
}
OpCode::SetServiceName => {
self.get_mut_span(&op.span_id)?.service = self.get_string_arg(index)?;
}
OpCode::SetResourceName => {
self.get_mut_span(&op.span_id)?.resource = self.get_string_arg(index)?;
}
OpCode::SetError => {
self.get_mut_span(&op.span_id)?.error = self.get_num_arg(index)?;
}
OpCode::SetStart => {
self.get_mut_span(&op.span_id)?.start = self.get_num_arg(index)?;
}
OpCode::SetDuration => {
self.get_mut_span(&op.span_id)?.duration = self.get_num_arg(index)?;
}
OpCode::SetType => {
self.get_mut_span(&op.span_id)?.r#type = self.get_string_arg(index)?;
}
OpCode::SetName => {
self.get_mut_span(&op.span_id)?.name = self.get_string_arg(index)?;
}
OpCode::SetTraceMetaAttr => {
let name = self.get_string_arg(index)?;
let val = self.get_string_arg(index)?;
let trace_id = self.get_span(&op.span_id)?.trace_id;
if let Some(trace) = self.traces.get_mut(&trace_id) {
trace.meta.insert(name, val);
}
}
OpCode::SetTraceMetricsAttr => {
let name = self.get_string_arg(index)?;
let val = self.get_num_arg(index)?;
let trace_id = self.get_span(&op.span_id)?.trace_id;
if let Some(trace) = self.traces.get_mut(&trace_id) {
trace.metrics.insert(name, val);
}
}
OpCode::SetTraceOrigin => {
let origin = self.get_string_arg(index)?;
let trace_id = self.get_span(&op.span_id)?.trace_id;
if let Some(trace) = self.traces.get_mut(&trace_id) {
trace.origin = Some(origin);
}
}
};

Ok(())
}

pub fn string_table_insert_one(&mut self, key: u32, val: T) {
self.string_table.insert(key, val);
}

pub fn string_table_evict_one(&mut self, key: u32) {
self.string_table.remove(&key);
}
}
#[repr(u64)]
pub enum OpCode {
Create = 0,
SetMetaAttr = 1,
SetMetricAttr = 2,
SetServiceName = 3,
SetResourceName = 4,
SetError = 5,
SetStart = 6,
SetDuration = 7,
SetType = 8,
SetName = 9,
SetTraceMetaAttr = 10,
SetTraceMetricsAttr = 11,
SetTraceOrigin = 12,
// TODO: SpanLinks, SpanEvents, StructAttr
}

impl TryFrom<u64> for OpCode {
type Error = anyhow::Error;

fn try_from(val: u64) -> Result<Self> {
Ok(match val {
0 => OpCode::Create,
1 => OpCode::SetMetaAttr,
2 => OpCode::SetMetricAttr,
3 => OpCode::SetServiceName,
4 => OpCode::SetResourceName,
5 => OpCode::SetError,
6 => OpCode::SetStart,
7 => OpCode::SetDuration,
8 => OpCode::SetType,
9 => OpCode::SetName,
10 => OpCode::SetTraceMetaAttr,
11 => OpCode::SetTraceMetricsAttr,
12 => OpCode::SetTraceOrigin,
_ => bail!("unknown opcode")
})
}
}

pub struct BufferedOperation {
pub opcode: OpCode,
pub span_id: u64,
}

impl BufferedOperation {
pub fn from_buf(buf: &ChangeBuffer, index: &mut usize) -> Result<Self> {
let opcode = buf.read::<u64>(index)?.try_into()?;
let span_id = buf.read(index)?;
Ok(BufferedOperation {
opcode,
span_id,
})
}
}
Loading
Loading