-
Notifications
You must be signed in to change notification settings - Fork 52
[kernel-1116] browser events: add external events #227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
76d837f
5c0f7ae
ad2dd63
365787c
d086571
3339cac
64e67b1
048f9c6
e57e58a
886b893
06d2470
6bb5402
cb45a55
02ee74e
3523994
cb1a1a7
c9e78a3
1cddf53
c6dd362
ff8bddf
a8bdeaf
8f88ed0
3eaacb3
214858a
b06132a
8ebb5e3
4bcba48
1295232
a4fd0d6
d6b348b
3da65c3
a7b2e54
85d570a
bed53f8
d73793c
348243a
a62c403
0605227
33c07d3
9c6e066
5dd9273
7550bc1
f3d3166
1cfbc5e
2e0c4a0
bf4b04c
90a3ae1
4feef7e
8e94162
5465e59
7c4c654
bca495b
fd4d4d3
ad28202
1f45379
ff2a221
83d118f
0a89c58
f97e3a9
4cb8f0e
55abde4
232eb37
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,166 @@ | ||
| package api | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| "github.com/kernel/kernel-images/server/lib/events" | ||
| oapi "github.com/kernel/kernel-images/server/lib/oapi" | ||
| ) | ||
|
|
||
| // PublishEvent handles POST /events/publish. | ||
| // Injects a caller-supplied event into the active capture session. Returns 400 | ||
| // if no session is active or the event fails validation. | ||
| func (s *ApiService) PublishEvent(_ context.Context, req oapi.PublishEventRequestObject) (oapi.PublishEventResponseObject, error) { | ||
| if !s.captureSession.Active() { | ||
| return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "no active capture session"}}, nil | ||
| } | ||
|
|
||
| body := req.Body | ||
| if body == nil || body.Type == "" { | ||
| return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "type is required"}}, nil | ||
| } | ||
| if body.Type == events.TypeSessionEnded || body.Type == events.TypeEventsDropped { | ||
| return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "type is reserved"}}, nil | ||
| } | ||
|
|
||
| ev := events.Event{Type: body.Type} | ||
|
|
||
| if body.Ts != nil { | ||
| ev.Ts = *body.Ts | ||
| } | ||
| if body.Category != nil { | ||
| cat := events.EventCategory(*body.Category) | ||
| if !events.ValidCategory(cat) { | ||
| return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "invalid category"}}, nil | ||
| } | ||
| ev.Category = cat | ||
| } else { | ||
| ev.Category = events.CategorySystem | ||
| } | ||
|
archandatta marked this conversation as resolved.
|
||
|
|
||
| // Enforce source.kind = KindKernelAPI so callers can't spoof the origin. | ||
| ev.Source.Kind = events.KindKernelAPI | ||
| if body.Source != nil { | ||
| if body.Source.Event != nil { | ||
| ev.Source.Event = *body.Source.Event | ||
| } | ||
| if body.Source.Metadata != nil { | ||
| ev.Source.Metadata = *body.Source.Metadata | ||
| } | ||
| } | ||
|
|
||
| if body.Data != nil { | ||
| // re-marshal body.Data to normalize it into a canonical RawMessage byte slice. | ||
| data, err := json.Marshal(body.Data) | ||
| if err != nil { | ||
| return oapi.PublishEvent400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "invalid data"}}, nil | ||
| } | ||
| ev.Data = json.RawMessage(data) | ||
| } | ||
|
|
||
| s.captureSession.PublishUnfiltered(ev) | ||
| return oapi.PublishEvent200Response{}, nil | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PublishEvent returns 200 when event silently droppedLow Severity
Reviewed by Cursor Bugbot for commit 55abde4. Configure here. |
||
| } | ||
|
|
||
| // StreamEvents handles GET /events/stream. | ||
| // Opens an SSE stream of envelopes from the active capture session's ring buffer. | ||
| // Supports reconnection via the Last-Event-ID header. Emits a keepalive comment | ||
| // frame every 15 s when no event arrives, and exits cleanly on session_ended. | ||
| func (s *ApiService) StreamEvents(ctx context.Context, req oapi.StreamEventsRequestObject) (oapi.StreamEventsResponseObject, error) { | ||
| if !s.captureSession.Active() { | ||
| return oapi.StreamEvents400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "no active capture session"}}, nil | ||
| } | ||
|
|
||
| afterSeq := uint64(0) | ||
| if id := req.Params.LastEventID; id != nil && *id != "" { | ||
| // Invalid/non-numeric values fall back to 0, replaying all events from the start. | ||
| // Note: seq is per capture session and resets on each Start(). A Last-Event-ID | ||
| // from a previous session may silently overlap with the current session's seqs. | ||
| if n, err := strconv.ParseUint(*id, 10, 64); err == nil { | ||
| afterSeq = n | ||
| } | ||
| } | ||
|
|
||
| sessionID := s.captureSession.ID() | ||
| reader := s.captureSession.NewReader(afterSeq) | ||
|
archandatta marked this conversation as resolved.
|
||
|
|
||
| pr, pw := io.Pipe() | ||
| go func() { | ||
| defer pw.Close() | ||
| for { | ||
| readCtx, cancel := context.WithTimeout(ctx, 15*time.Second) | ||
| result, err := reader.Read(readCtx) | ||
| cancel() | ||
| if err != nil { | ||
| if errors.Is(err, context.DeadlineExceeded) { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| default: | ||
| // No event in 15 s and client still connected, send keepalive. | ||
| if _, err := pw.Write([]byte(":\n\n")); err != nil { | ||
| return | ||
| } | ||
| continue | ||
| } | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if result.Dropped > 0 { | ||
| env := events.Envelope{ | ||
| CaptureSessionID: sessionID, | ||
| Seq: 0, | ||
| Event: events.Event{ | ||
| Ts: time.Now().UnixMicro(), | ||
| Type: events.TypeEventsDropped, | ||
| Category: events.CategorySystem, | ||
| Source: events.Source{Kind: events.KindKernelAPI}, | ||
| Data: json.RawMessage(fmt.Sprintf(`{"dropped":%d}`, result.Dropped)), | ||
| }, | ||
| } | ||
| // Omit the id: field so the client's Last-Event-ID is not overwritten. | ||
| if err := writeEnvelopeFrame(pw, nil, env); err != nil { | ||
| return | ||
| } | ||
| continue | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| env := result.Envelope | ||
| if err := writeEnvelopeFrame(pw, &env.Seq, *env); err != nil { | ||
| return | ||
| } | ||
| if env.Event.Type == events.TypeSessionEnded { | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| headers := oapi.StreamEvents200ResponseHeaders{XSSEContentType: "application/json"} | ||
| return oapi.StreamEvents200TexteventStreamResponse{Body: pr, Headers: headers}, nil | ||
| } | ||
|
|
||
| // writeEnvelopeFrame writes a single SSE frame. If seq is non-nil it is | ||
| // emitted as the id: field, updating the client's Last-Event-ID. | ||
| func writeEnvelopeFrame(w io.Writer, seq *uint64, env events.Envelope) error { | ||
| data, err := json.Marshal(env) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| var buf bytes.Buffer | ||
| if seq != nil { | ||
| fmt.Fprintf(&buf, "id: %d\n", *seq) | ||
| } | ||
| buf.WriteString("data: ") | ||
| buf.Write(data) | ||
| buf.WriteString("\n\n") | ||
| _, err = w.Write(buf.Bytes()) | ||
| return err | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,92 @@ | ||
| package api | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "context" | ||
| "encoding/json" | ||
| "strings" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/kernel/kernel-images/server/lib/events" | ||
| oapi "github.com/kernel/kernel-images/server/lib/oapi" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestEventLifecycle(t *testing.T) { | ||
| t.Parallel() | ||
| ctx := context.Background() | ||
| svc := newTestService(t, newMockRecordManager()) | ||
|
|
||
| // Start a capture session. | ||
| startResp, err := svc.StartCaptureSession(ctx, oapi.StartCaptureSessionRequestObject{}) | ||
| require.NoError(t, err) | ||
| require.IsType(t, oapi.StartCaptureSession201JSONResponse{}, startResp) | ||
|
|
||
| // Open an SSE stream (5s budget covers the three 2s selects below). | ||
| streamCtx, streamCancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer streamCancel() | ||
| streamResp, err := svc.StreamEvents(streamCtx, oapi.StreamEventsRequestObject{}) | ||
| require.NoError(t, err) | ||
| r200, ok := streamResp.(oapi.StreamEvents200TexteventStreamResponse) | ||
| require.True(t, ok) | ||
|
|
||
| // Drain SSE frames into a channel. | ||
| received := make(chan events.Envelope, 4) | ||
| go func() { | ||
| defer close(received) | ||
| rd := bufio.NewReader(r200.Body) | ||
| for { | ||
| line, err := rd.ReadString('\n') | ||
| if err != nil { | ||
| return | ||
| } | ||
| if !strings.HasPrefix(line, "data: ") { | ||
| continue | ||
| } | ||
| payload := strings.TrimSpace(strings.TrimPrefix(line, "data: ")) | ||
| var env events.Envelope | ||
| if err := json.Unmarshal([]byte(payload), &env); err != nil { | ||
| continue | ||
| } | ||
| received <- env | ||
| } | ||
| }() | ||
|
|
||
| // Publish an event. | ||
| resp, err := svc.PublishEvent(ctx, oapi.PublishEventRequestObject{ | ||
| Body: &oapi.Event{Type: "test.event"}, | ||
| }) | ||
| require.NoError(t, err) | ||
| assert.IsType(t, oapi.PublishEvent200Response{}, resp) | ||
|
|
||
| // Verify the published event arrives on the stream. | ||
| select { | ||
| case env := <-received: | ||
| assert.Equal(t, "test.event", env.Event.Type) | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("timed out waiting for test.event") | ||
| } | ||
|
|
||
| // Stop the session. | ||
| stopResp, err := svc.StopCaptureSession(ctx, oapi.StopCaptureSessionRequestObject{}) | ||
| require.NoError(t, err) | ||
| assert.IsType(t, oapi.StopCaptureSession200JSONResponse{}, stopResp) | ||
|
|
||
| // Verify session_ended arrives on the stream. | ||
| select { | ||
| case env := <-received: | ||
| assert.Equal(t, events.TypeSessionEnded, env.Event.Type) | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("timed out waiting for session_ended") | ||
| } | ||
|
|
||
| // Verify the stream closes after session_ended. | ||
| select { | ||
| case _, open := <-received: | ||
| assert.False(t, open, "stream should be closed after session_ended") | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("timed out waiting for stream to close") | ||
| } | ||
| } |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add one lifecycle test for the new event flow? Something like start capture session, publish an event through
PublishEvent, read it fromStreamEvents, then stop and verify the stream receivessession_ended.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4cb8f0e