Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions server/lib/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ import (
// maxS2RecordBytes is the maximum record size for the S2 event pipeline (1 MB).
const maxS2RecordBytes = 1_000_000

// Below this conservative size estimate, JSON overhead cannot push a record
// near the S2 limit, so Publish can avoid a full marshal on the hot path.
const truncateMarshalThreshold = maxS2RecordBytes - 64*1024

const (
jsonEnvelopeOverhead = 512
jsonMetadataEntryOverhead = 16
)

const (
Console = oapi.TelemetryEventCategory("console")
Network = oapi.TelemetryEventCategory("network")
Expand Down Expand Up @@ -97,8 +106,11 @@ type Envelope struct {

// truncateIfNeeded marshals env and returns the (possibly truncated) envelope.
// If the envelope still exceeds maxS2RecordBytes after nulling data (e.g. huge
// source.metadata), it is returned as-is, callers must handle nil data.
// source.metadata), it is returned as-is with data set to null.
func truncateIfNeeded(env Envelope) (Envelope, []byte) {
if estimatedEnvelopeBytes(env) < truncateMarshalThreshold {
return env, nil
}
Comment thread
cursor[bot] marked this conversation as resolved.
data, err := json.Marshal(env)
if err != nil {
return env, nil
Expand All @@ -113,7 +125,27 @@ func truncateIfNeeded(env Envelope) (Envelope, []byte) {
return env, nil
}
if len(data) > maxS2RecordBytes {
slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "seq", env.Seq, "size", len(data))
slog.Warn("truncateIfNeeded: envelope exceeds limit even without data", "size", len(data))
}
return env, data
}

func estimatedEnvelopeBytes(env Envelope) int {
n := jsonEnvelopeOverhead + len(env.Event.Data)
n += maxEscapedJSONLen(env.Event.Type)
n += maxEscapedJSONLen(string(env.Event.Category))
n += maxEscapedJSONLen(string(env.Event.Source.Kind))
if env.Event.Source.Event != nil {
n += maxEscapedJSONLen(*env.Event.Source.Event)
}
if env.Event.Source.Metadata != nil {
for k, v := range *env.Event.Source.Metadata {
n += jsonMetadataEntryOverhead + maxEscapedJSONLen(k) + maxEscapedJSONLen(v)
}
}
return n
}

func maxEscapedJSONLen(s string) int {
return len(s) * 6
}
107 changes: 107 additions & 0 deletions server/lib/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package events
import (
"context"
"encoding/json"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -118,6 +119,44 @@ func TestEventOmitEmpty(t *testing.T) {
assert.NotContains(t, s, `"event"`)
}

func TestTruncateIfNeededChecksLargeMetadata(t *testing.T) {
metadata := map[string]string{"large": strings.Repeat("x", maxS2RecordBytes)}
env := Envelope{
Event: Event{
Type: "console.log",
Category: Console,
Source: oapi.BrowserEventSource{Kind: oapi.Cdp, Metadata: &metadata},
Data: json.RawMessage(`{"message":"hello"}`),
},
}

truncated, data := truncateIfNeeded(env)

require.NotNil(t, data)
assert.True(t, truncated.Event.Truncated)
assert.Equal(t, json.RawMessage("null"), truncated.Event.Data)
assert.Greater(t, len(data), maxS2RecordBytes)
}

func TestTruncateIfNeededChecksEscapedMetadata(t *testing.T) {
metadata := map[string]string{"escaped": strings.Repeat("<", maxS2RecordBytes/5)}
env := Envelope{
Event: Event{
Type: "console.log",
Category: Console,
Source: oapi.BrowserEventSource{Kind: oapi.Cdp, Metadata: &metadata},
Data: json.RawMessage(`{"message":"hello"}`),
},
}

truncated, data := truncateIfNeeded(env)

require.NotNil(t, data)
assert.True(t, truncated.Event.Truncated)
assert.Equal(t, json.RawMessage("null"), truncated.Event.Data)
assert.Greater(t, len(data), maxS2RecordBytes)
}

func mkEnv(seq uint64, ev Event) Envelope {
return Envelope{Seq: seq, Event: ev}
}
Expand All @@ -133,6 +172,74 @@ func newTestRingBuffer(t *testing.T, capacity int) *ringBuffer {
return rb
}

func TestEventStreamPublishAssignsSeq(t *testing.T) {
es, err := NewEventStream(EventStreamConfig{RingCapacity: 10})
require.NoError(t, err)
reader := es.NewReader(0)

first := es.Publish(Envelope{Event: cdpEvent("console.log", Console)})
second := es.Publish(Envelope{Event: cdpEvent("network.request", Network)})

assert.Equal(t, uint64(1), first.Seq)
assert.Equal(t, uint64(2), second.Seq)
assert.Equal(t, uint64(2), es.Seq())

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
assert.Equal(t, uint64(1), readEnvelope(t, reader, ctx).Seq)
assert.Equal(t, uint64(2), readEnvelope(t, reader, ctx).Seq)
}

func TestEventStreamPublishTruncatesWithAssignedSeq(t *testing.T) {
es, err := NewEventStream(EventStreamConfig{RingCapacity: 10})
require.NoError(t, err)
nextSeq := uint64(1_000_000_000_000_000_000)
es.ring.latestSeq = nextSeq - 1

env := Envelope{
Event: Event{
Type: "console.log",
Category: Console,
Source: oapi.BrowserEventSource{Kind: oapi.Cdp},
Data: json.RawMessage(`""`),
},
}
seqZeroLen := marshaledEnvelopeLen(t, env)
env.Seq = nextSeq
nextSeqLen := marshaledEnvelopeLen(t, env)
payloadLen := maxS2RecordBytes - seqZeroLen
payload := json.RawMessage(`"` + strings.Repeat("x", payloadLen) + `"`)
seqZeroEnv := Envelope{Event: Event{
Type: "console.log",
Category: Console,
Source: oapi.BrowserEventSource{Kind: oapi.Cdp},
Data: payload,
}}
nextSeqEnv := seqZeroEnv
nextSeqEnv.Seq = nextSeq
require.LessOrEqual(t, marshaledEnvelopeLen(t, seqZeroEnv), maxS2RecordBytes)
require.Greater(t, marshaledEnvelopeLen(t, nextSeqEnv), maxS2RecordBytes)
require.Greater(t, nextSeqLen-seqZeroLen, 0)

published := es.Publish(Envelope{Event: Event{
Type: "console.log",
Category: Console,
Source: oapi.BrowserEventSource{Kind: oapi.Cdp},
Data: payload,
}})

assert.Equal(t, nextSeq, published.Seq)
assert.True(t, published.Event.Truncated)
assert.Equal(t, json.RawMessage("null"), published.Event.Data)
}

func marshaledEnvelopeLen(t *testing.T, env Envelope) int {
t.Helper()
data, err := json.Marshal(env)
require.NoError(t, err)
return len(data)
}

// TestRingBuffer: publish 3 envelopes; reader reads all 3 in order
func TestRingBuffer(t *testing.T) {
rb := newTestRingBuffer(t, 10)
Expand Down
20 changes: 4 additions & 16 deletions server/lib/events/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package events

import (
"fmt"
"sync"
)

// EventStream is the process-lifetime event bus. It owns the ring buffer and
// sequence counter, which outlive individual capture sessions.
// EventStream is the process-lifetime event bus. Its ring buffer and sequence
// counter outlive individual capture sessions.
type EventStream struct {
mu sync.Mutex
seq uint64
ring *ringBuffer
}

Expand All @@ -29,14 +26,7 @@ func NewEventStream(cfg EventStreamConfig) (*EventStream, error) {
// Publish assigns a monotonically increasing seq to env, truncates oversized
// payloads, and pushes it to the ring buffer.
func (es *EventStream) Publish(env Envelope) Envelope {
es.mu.Lock()
es.seq++
env.Seq = es.seq
es.mu.Unlock()

env, _ = truncateIfNeeded(env)
es.ring.publish(env)
return env
return es.ring.publishNext(env)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Truncation checks size before seq

Medium Severity

EventStream.Publish now runs truncateIfNeeded before publishNext assigns seq, so the slow-path size check marshals with "seq":0. Envelopes whose JSON length is just under maxS2RecordBytes at seq zero can pass without truncation, then exceed the S2 1 MiB cap once the real monotonic seq is written and consumers remarshal for SSE or storage.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 140f7c2. Configure here.

}

// NewReader returns a Reader positioned after afterSeq. Pass 0 to start from
Expand All @@ -47,7 +37,5 @@ func (es *EventStream) NewReader(afterSeq uint64) *Reader {

// Seq returns the sequence number of the last published event.
func (es *EventStream) Seq() uint64 {
es.mu.Lock()
defer es.mu.Unlock()
return es.seq
return es.ring.seq()
}
49 changes: 49 additions & 0 deletions server/lib/events/eventstream_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package events

import (
"encoding/json"
"testing"

oapi "github.com/kernel/kernel-images/server/lib/oapi"
)

func benchmarkEvent() Event {
return Event{
Ts: 123456789,
Type: "console.log",
Category: Console,
Source: oapi.BrowserEventSource{Kind: oapi.Cdp},
Data: json.RawMessage(`{"message":"hello","level":"log"}`),
}
}

func BenchmarkEventStreamPublish(b *testing.B) {
es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024})
if err != nil {
b.Fatal(err)
}
ev := benchmarkEvent()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
es.Publish(Envelope{Event: ev})
}
}

func BenchmarkEventStreamPublishRead(b *testing.B) {
es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024})
if err != nil {
b.Fatal(err)
}
reader := es.NewReader(0)
ev := benchmarkEvent()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
es.Publish(Envelope{Event: ev})
res, ok := reader.TryRead()
if !ok || res.Envelope == nil {
b.Fatal("expected envelope")
}
}
}
Loading
Loading