Skip to content

Latest commit

 

History

History
253 lines (193 loc) · 7.86 KB

File metadata and controls

253 lines (193 loc) · 7.86 KB
title Backend Telemetry Processor
description Detailed backend telemetry processor design.
sidebar_order 1
🚧 This document is work in progress.

For the common specification, refer to the Telemetry Processor page. This page describes the backend-specific implementation. The key difference is that backend SDKs use weighted round-robin scheduling to ensure critical telemetry (like errors) gets priority over high-volume data (like logs) when the application is under heavy load.

Backend-Specific Design Decisions

  • Weighted round-robin scheduling: Backend applications often run under sustained high load. Weighted scheduling ensures critical telemetry (errors) gets sent even when flooded with high-volume data (logs, spans).
  • Signal-based scheduling: The scheduler wakes when new data arrives rather than polling, reducing CPU overhead in idle periods.

Priorities

  • CRITICAL: Error, Feedback.
  • HIGH: Session, CheckIn.
  • MEDIUM: Transaction, ClientReport, Span.
  • LOW: Log, Profile, ProfileChunk.
  • LOWEST: Replay.

Configurable via weights.

Components

TelemetryBuffer

Unlike the push-based approach, the telemetry buffer does not push data to the telemetry scheduler. Instead, the telemetry scheduler iterates through all buffers using weighted round-robin and pulls data when it determines a buffer is ready to flush. Still, the telemetry buffer MUST follow the common telemetry buffer requirements. Here are the additional backend-specific requirements:

  1. The telemetry buffer SHOULD drop older items as the overflow policy. It MAY also drop newer items to preserve what's already buffered.

On the backend, use the same size limits as the common requirements, except for spans, where we recommend 1000 because span volume is higher.

Span Buffer

The span buffer must follow the common telemetry span buffer requirements. Further requirements for the bucketed-by-trace buffer are:

  1. The span buffer MAY use FIFO to prioritize forwarding spans: always forward spans from the oldest traceID. FIFO prevents spans from lingering in the buffer.
Trace Consistency Trade-offs

There still remains a small subset of cases that might result in partial traces, where either an old trace bucket was dropped and a new span with the same trace arrived, or we dropped an incoming span of this trace. The preferred overflow behavior in most cases should be drop_oldest since it results in the fewest incomplete traces from the two scenarios.

Buffers are mapped to DataCategories, which determine their scheduling priority and rate limits.

TelemetryScheduler

The TelemetryScheduler runs as a background worker, coordinating the flow of telemetry from buffers to the transport:

  • Initialization: Constructs a weighted priority cycle (e.g., [CRITICAL×5, HIGH×4, MEDIUM×3, ...]) based on configured weights.
  • Event loop: Wakes when explicitly signaled from the captureX methods on the client when new data is available (if the language does not support this, then a periodic ticker can be used).
  • Buffer selection: Iterates through the priority cycle, selecting buffers that are ready to flush and not rate limited.
  • Rate limit coordination: Queries the transport's rate limit state before attempting to send any category.
  • Envelope construction: Converts buffered items into Sentry protocol envelopes.
    • Log items are batched together into a single envelope with multiple log entries.
    • Other categories typically send one item per envelope.

Transport

The transport layer handles HTTP communication with Sentry's ingestion endpoints.

The only layer responsible for dropping events is the Buffer. In case that the transport is full, then the Buffer should drop the batch.

Configuration

Transport Options

  • Capacity: 1000 items.

Telemetry Buffer Options

  • Capacity: 100 items for errors and check-ins, 10*BATCH_SIZE for logs, 1000 for transactions.
  • Overflow policy: drop_oldest.
  • Batch size: 1 for errors and check-ins (immediate send), 100 for logs.
  • Batch timeout: 5 seconds for logs.

Scheduler Options

  • Priority weights: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1.

Implementation Example (Go)

The sentry-go SDK provides a reference implementation of this architecture:

Storage Interface

type Storage[T any] interface {
    // Core operations
    Offer(item T) bool
    Poll() (T, bool)
    PollBatch(maxItems int) []T
    PollIfReady() []T
    Drain() []T
    Peek() (T, bool)

    // State queries
    Size() int
    Capacity() int
    IsEmpty() bool
    IsFull() bool
    Utilization() float64

    // Flush management
    IsReadyToFlush() bool
    MarkFlushed()

    // Category/Priority
    Category() ratelimit.Category
    Priority() ratelimit.Priority
}


// Single item buffer
func (b *RingBuffer[T]) PollIfReady() []T {
	b.mu.Lock()
	defer b.mu.Unlock()

	if b.size == 0 {
		return nil
	}

	ready := b.size >= b.batchSize ||
		(b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout)

	if !ready {
		return nil
	}

	itemCount := b.batchSize
	if itemCount > b.size {
		itemCount = b.size
	}

	result := make([]T, itemCount)
	var zero T

	for i := 0; i < itemCount; i++ {
		result[i] = b.items[b.head]
		b.items[b.head] = zero
		b.head = (b.head + 1) % b.capacity
		b.size--
	}

	b.lastFlushTime = time.Now()
	return result
}

// Bucketed buffer
func (b *BucketedBuffer[T]) PollIfReady() []T {
	b.mu.Lock()
	defer b.mu.Unlock()
	if b.bucketCount == 0 {
		return nil
	}
	// the batchSize is satisfied based on total items
	ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout)
	if !ready {
		return nil
	}
	// keep track of oldest bucket
	oldest := b.buckets[b.head]
	if oldest == nil {
		return nil
	}
	items := oldest.items
	if oldest.traceID != "" {
		delete(b.traceIndex, oldest.traceID)
	}
	b.buckets[b.head] = nil
	b.head = (b.head + 1) % b.bucketCapacity
	b.totalItems -= len(items)
	b.bucketCount--
	b.lastFlushTime = time.Now()
	return items
}

TelemetryScheduler Processing

func (s *TelemetryScheduler) run() {
	for {
		s.mu.Lock()

		for !s.hasWork() && s.ctx.Err() == nil {
		  // signal the scheduler to sleep till we receive a signal for an added item.
			s.cond.Wait()
		}

		s.mu.Unlock()
		s.processNextBatch()
	}
}

func (s *TelemetryScheduler) hasWork() bool {
	for _, buffer := range s.buffers {
		if buffer.IsReadyToFlush() {
			return true
		}
	}
	return false
}

func (s *TelemetryScheduler) processNextBatch() {
	if len(s.currentCycle) == 0 {
		return
	}

	priority := s.currentCycle[s.cyclePos]
	s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle)

	var bufferToProcess TelemetryBuffer[protocol.EnvelopeItemConvertible]
	var categoryToProcess ratelimit.Category
	for category, buffer := range s.buffers {
		if buffer.Priority() == priority && buffer.IsReadyToFlush() {
			bufferToProcess = buffer
			categoryToProcess = category
			break
		}
	}

	if bufferToProcess != nil {
		s.processItems(bufferToProcess, categoryToProcess, false)
	}
}

Flushing

func (s *TelemetryScheduler) flush() {
  // should process all buffers and send to transport
  for category, buffer := range s.buffers {
		if !buffer.IsEmpty() {
			s.processItems(buffer, category, true)
		}
	}
}

// The Buffer exposes the flush method that calls both
func (b *Buffer) Flush(timeout time.Duration) {
  scheduler.flush()
  transport.flush(timeout)
}