| title | Backend Telemetry Processor |
|---|---|
| description | Detailed backend telemetry processor design. |
| sidebar_order | 1 |
For the common specification, refer to the Telemetry Processor page. This page describes the backend-specific implementation. The key difference is that backend SDKs use weighted round-robin scheduling to ensure critical telemetry (like errors) gets priority over high-volume data (like logs) when the application is under heavy load.
- Weighted round-robin scheduling: Backend applications often run under sustained high load. Weighted scheduling ensures critical telemetry (errors) gets sent even when flooded with high-volume data (logs, spans).
- Signal-based scheduling: The scheduler wakes when new data arrives rather than polling, reducing CPU overhead in idle periods.
- CRITICAL: Error, Feedback.
- HIGH: Session, CheckIn.
- MEDIUM: Transaction, ClientReport, Span.
- LOW: Log, Profile, ProfileChunk.
- LOWEST: Replay.
Configurable via weights.
Unlike the push-based approach, the telemetry buffer does not push data to the telemetry scheduler. Instead, the telemetry scheduler iterates through all buffers using weighted round-robin and pulls data when it determines a buffer is ready to flush. Still, the telemetry buffer MUST follow the common telemetry buffer requirements. Here are the additional backend-specific requirements:
- The telemetry buffer SHOULD drop older items as the overflow policy. It MAY also drop newer items to preserve what's already buffered.
On the backend, use the same size limits as the common requirements, except for spans, where we recommend 1000 because span volume is higher.
The span buffer must follow the common telemetry span buffer requirements. Further requirements for the bucketed-by-trace buffer are:
- The span buffer MAY use FIFO to prioritize forwarding spans: always forward spans from the oldest traceID. FIFO prevents spans from lingering in the buffer.
There still remains a small subset of cases that might result in partial traces, where either an old trace bucket was dropped and a new span with the same trace arrived, or we dropped an incoming span of this trace.
The preferred overflow behavior in most cases should be drop_oldest since it results in the fewest incomplete traces from the two scenarios.
Buffers are mapped to DataCategories, which determine their scheduling priority and rate limits.
The TelemetryScheduler runs as a background worker, coordinating the flow of telemetry from buffers to the transport:
- Initialization: Constructs a weighted priority cycle (e.g.,
[CRITICAL×5, HIGH×4, MEDIUM×3, ...]) based on configured weights. - Event loop: Wakes when explicitly signaled from the
captureXmethods on the client when new data is available (if the language does not support this, then a periodic ticker can be used). - Buffer selection: Iterates through the priority cycle, selecting buffers that are ready to flush and not rate limited.
- Rate limit coordination: Queries the transport's rate limit state before attempting to send any category.
- Envelope construction: Converts buffered items into Sentry protocol envelopes.
- Log items are batched together into a single envelope with multiple log entries.
- Other categories typically send one item per envelope.
The transport layer handles HTTP communication with Sentry's ingestion endpoints.
The only layer responsible for dropping events is the Buffer. In case that the transport is full, then the Buffer should drop the batch.
- Capacity: 1000 items.
- Capacity: 100 items for errors and check-ins, 10*BATCH_SIZE for logs, 1000 for transactions.
- Overflow policy:
drop_oldest. - Batch size: 1 for errors and check-ins (immediate send), 100 for logs.
- Batch timeout: 5 seconds for logs.
- Priority weights: CRITICAL=5, HIGH=4, MEDIUM=3, LOW=2, LOWEST=1.
The sentry-go SDK provides a reference implementation of this architecture:
type Storage[T any] interface {
// Core operations
Offer(item T) bool
Poll() (T, bool)
PollBatch(maxItems int) []T
PollIfReady() []T
Drain() []T
Peek() (T, bool)
// State queries
Size() int
Capacity() int
IsEmpty() bool
IsFull() bool
Utilization() float64
// Flush management
IsReadyToFlush() bool
MarkFlushed()
// Category/Priority
Category() ratelimit.Category
Priority() ratelimit.Priority
}
// Single item buffer
func (b *RingBuffer[T]) PollIfReady() []T {
b.mu.Lock()
defer b.mu.Unlock()
if b.size == 0 {
return nil
}
ready := b.size >= b.batchSize ||
(b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout)
if !ready {
return nil
}
itemCount := b.batchSize
if itemCount > b.size {
itemCount = b.size
}
result := make([]T, itemCount)
var zero T
for i := 0; i < itemCount; i++ {
result[i] = b.items[b.head]
b.items[b.head] = zero
b.head = (b.head + 1) % b.capacity
b.size--
}
b.lastFlushTime = time.Now()
return result
}
// Bucketed buffer
func (b *BucketedBuffer[T]) PollIfReady() []T {
b.mu.Lock()
defer b.mu.Unlock()
if b.bucketCount == 0 {
return nil
}
// the batchSize is satisfied based on total items
ready := b.totalItems >= b.batchSize || (b.timeout > 0 && time.Since(b.lastFlushTime) >= b.timeout)
if !ready {
return nil
}
// keep track of oldest bucket
oldest := b.buckets[b.head]
if oldest == nil {
return nil
}
items := oldest.items
if oldest.traceID != "" {
delete(b.traceIndex, oldest.traceID)
}
b.buckets[b.head] = nil
b.head = (b.head + 1) % b.bucketCapacity
b.totalItems -= len(items)
b.bucketCount--
b.lastFlushTime = time.Now()
return items
}func (s *TelemetryScheduler) run() {
for {
s.mu.Lock()
for !s.hasWork() && s.ctx.Err() == nil {
// signal the scheduler to sleep till we receive a signal for an added item.
s.cond.Wait()
}
s.mu.Unlock()
s.processNextBatch()
}
}
func (s *TelemetryScheduler) hasWork() bool {
for _, buffer := range s.buffers {
if buffer.IsReadyToFlush() {
return true
}
}
return false
}
func (s *TelemetryScheduler) processNextBatch() {
if len(s.currentCycle) == 0 {
return
}
priority := s.currentCycle[s.cyclePos]
s.cyclePos = (s.cyclePos + 1) % len(s.currentCycle)
var bufferToProcess TelemetryBuffer[protocol.EnvelopeItemConvertible]
var categoryToProcess ratelimit.Category
for category, buffer := range s.buffers {
if buffer.Priority() == priority && buffer.IsReadyToFlush() {
bufferToProcess = buffer
categoryToProcess = category
break
}
}
if bufferToProcess != nil {
s.processItems(bufferToProcess, categoryToProcess, false)
}
}func (s *TelemetryScheduler) flush() {
// should process all buffers and send to transport
for category, buffer := range s.buffers {
if !buffer.IsEmpty() {
s.processItems(buffer, category, true)
}
}
}
// The Buffer exposes the flush method that calls both
func (b *Buffer) Flush(timeout time.Duration) {
scheduler.flush()
transport.flush(timeout)
}