-
Notifications
You must be signed in to change notification settings - Fork 66
Optimize telemetry event publishing #297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,14 +2,11 @@ package events | |
|
|
||
| import ( | ||
| "fmt" | ||
| "sync" | ||
| ) | ||
|
|
||
| // EventStream is the process-lifetime event bus. It owns the ring buffer and | ||
| // sequence counter, which outlive individual capture sessions. | ||
| // EventStream is the process-lifetime event bus. Its ring buffer and sequence | ||
| // counter outlive individual capture sessions. | ||
| type EventStream struct { | ||
| mu sync.Mutex | ||
| seq uint64 | ||
| ring *ringBuffer | ||
| } | ||
|
|
||
|
|
@@ -29,14 +26,7 @@ func NewEventStream(cfg EventStreamConfig) (*EventStream, error) { | |
| // Publish assigns a monotonically increasing seq to env, truncates oversized | ||
| // payloads, and pushes it to the ring buffer. | ||
| func (es *EventStream) Publish(env Envelope) Envelope { | ||
| es.mu.Lock() | ||
| es.seq++ | ||
| env.Seq = es.seq | ||
| es.mu.Unlock() | ||
|
|
||
| env, _ = truncateIfNeeded(env) | ||
| es.ring.publish(env) | ||
| return env | ||
| return es.ring.publishNext(env) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Truncation checks size before seqMedium Severity
Additional Locations (1)Reviewed by Cursor Bugbot for commit 140f7c2. Configure here. |
||
| } | ||
|
|
||
| // NewReader returns a Reader positioned after afterSeq. Pass 0 to start from | ||
|
|
@@ -47,7 +37,5 @@ func (es *EventStream) NewReader(afterSeq uint64) *Reader { | |
|
|
||
| // Seq returns the sequence number of the last published event. | ||
| func (es *EventStream) Seq() uint64 { | ||
| es.mu.Lock() | ||
| defer es.mu.Unlock() | ||
| return es.seq | ||
| return es.ring.seq() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| package events | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "testing" | ||
|
|
||
| oapi "github.com/kernel/kernel-images/server/lib/oapi" | ||
| ) | ||
|
|
||
| func benchmarkEvent() Event { | ||
| return Event{ | ||
| Ts: 123456789, | ||
| Type: "console.log", | ||
| Category: Console, | ||
| Source: oapi.BrowserEventSource{Kind: oapi.Cdp}, | ||
| Data: json.RawMessage(`{"message":"hello","level":"log"}`), | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkEventStreamPublish(b *testing.B) { | ||
| es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024}) | ||
| if err != nil { | ||
| b.Fatal(err) | ||
| } | ||
| ev := benchmarkEvent() | ||
| b.ReportAllocs() | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| es.Publish(Envelope{Event: ev}) | ||
| } | ||
| } | ||
|
|
||
| func BenchmarkEventStreamPublishRead(b *testing.B) { | ||
| es, err := NewEventStream(EventStreamConfig{RingCapacity: 1024}) | ||
| if err != nil { | ||
| b.Fatal(err) | ||
| } | ||
| reader := es.NewReader(0) | ||
| ev := benchmarkEvent() | ||
| b.ReportAllocs() | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| es.Publish(Envelope{Event: ev}) | ||
| res, ok := reader.TryRead() | ||
| if !ok || res.Envelope == nil { | ||
| b.Fatal("expected envelope") | ||
| } | ||
| } | ||
| } |


Uh oh!
There was an error while loading. Please reload this page.