From 56d0a16bcfa6c1b1ab3293cc801aee5cfe0a5c0c Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Fri, 24 Apr 2026 19:07:46 +0000 Subject: [PATCH 1/2] feat: add plan --- plans/s2-storage.md | 129 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) create mode 100644 plans/s2-storage.md diff --git a/plans/s2-storage.md b/plans/s2-storage.md new file mode 100644 index 00000000..0a205440 --- /dev/null +++ b/plans/s2-storage.md @@ -0,0 +1,129 @@ +# Browser Events Durable Storage Plan + +--- + +## Overview + +Browser Events captured by the image server (CDP, live view, computer control, captcha) are already written to per-category JSONL files and streamed over SSE. This plan adds a third sink: a cloud append-only log store. + +--- + +## System Context + +``` +[CDP Monitor] ──┐ +[Computer API] ──┤─► CaptureSession.Publish ──► RingBuffer (fan-out) +[Extension] ──┤ │ +[Live View] ──┘ ┌───────────────┼──────────────┐ + │ │ │ + FileWriter SSE handler EventsStorageWriter ◄─ (new) + (local) (real-time) (durable) +``` + +All three sinks consume from the same ring buffer. The ring buffer is non-blocking: writers never wait for any sink. Each sink holds an independent `Reader` cursor. + +--- + +## Key Components + +### `EventsStorageWriter` (`eventsstorage.go`) + +The single goroutine that moves events from the ring to the configured backend. Single-use: `Run(ctx)` blocks until ctx is cancelled, then returns. `Close()` drains in-flight writes and tears down the backend. + +### `EventsStorage` (interface in `eventsstorage.go`) + +```go +type EventsStorage interface { + // Append writes data to the named stream. The S2 backend relies on + // the basin's create-stream-on-append feature. + Append(ctx context.Context, streamName string, data []byte) error + // Close flushes pending writes and releases resources. + Close() error +} +``` + +The interface boundary between `EventsStorageWriter` and any specific backend. The mock implementation used in tests lives exclusively in `eventsstorage_writer_test.go`. + +### S2 Storage (`s2storage.go`) + +The production `EventsStorage` backed by S2. Lazily creates one S2 producer per capture session ID. The producer map is mutex-protected; `Append` is called serially from `EventsStorageWriter.Run`, but ack goroutines run concurrently. + +**Producer lifecycle:** Producers are evicted when their capture session ends via `Remove(streamName string)`, called from the `POST /events/stop` handler. This prevents unbounded accumulation of producers across session cycles on long-running servers. + +### `s2Producer` + +Bundles one `s2.Producer` with a `sync.WaitGroup` that tracks in-flight ack goroutines. `Close()` calls `wg.Wait()` before closing the producer, ensuring no ack is orphaned. + +--- + +## File Structure + +``` +server/lib/events/ + eventsstorage.go # EventsStorage interface + EventsStorageWriter + eventsstorage_writer_test.go # Tests via mockBackend — no S2 dependency + s2storage.go # S2 implementation of EventsStorage +``` + +--- + +## Architectural Decisions + +### 1. Stream name = capture session ID + +Each capture session maps to a dedicated stream named by the session UUID. Streams are created automatically on first write (S2 does this via create-stream-on-append basin feature). This means: + +- Replaying a session = reading one stream from seq 0 +- Concurrent sessions write to separate streams with no coordination + +### 2. Lazy producer creation with session-end eviction (S2 backend) + +Producers are created on first `Append` for a given stream name and cached until the session ends. The `s2storage` exposes a `Remove(streamName)` method that drains and closes the producer for that stream. `POST /events/stop` calls `Remove` after the session is torn down. Preventing the producer map from growing unbounded on long-running servers that cycle through many capture sessions. + +### 3. Batching: 100ms linger / 50 records (S2 backend) + +The S2 SDK batcher coalesces records before flushing to the network. Configuration: + +``` +Linger: 100ms +MaxRecords: 50 +``` + +These are independent of the ring buffer read loop — the writer appends one record per ring Read, and the batcher decides when to flush. + +### 4. Feedback loop prevention + +`EventsStorageWriter.Run` skips envelopes whose `Event.Type == EventsStorageError`. Without this, a error would re-enter the ring and be read by EventsStorageWriter causing churn. The constant `EventsStorageError` is defined in `event.go` and used in both the writer and the error-emit path to prevent typo-driven breakage. + +### 5. 1MB record size limit and truncation + +The S2 1MB per-record limit is enforced at `CaptureSession.Publish`, not at the EventsStorageWriter. `truncateIfNeeded` nulls `event.data` and sets `event.truncated=true` when the marshalled envelope exceeds `maxRecordBytes`. This ensures truncation applies equally to file logs and durable records. + +### 6. Shutdown sequencing + +Shutdown must be strictly ordered to avoid writing to a closed `FileWriter`: + +``` +1. ctx cancelled (SIGINT/SIGTERM) +2. EventsStorageWriter.Run returns (reader unblocks from cancelled ctx) +3. storageDone channel closes +4. storageWriter.Close() — drains in-flight S2 writes +5. apiService.Shutdown() — closes CaptureSession (and its FileWriter) +``` + +--- + +## Testing + +`EventsStorageWriter` is tested exclusively through `mockBackend` (defined in `eventsstorage_writer_test.go`). Test cases cover: + +| Scenario | What is verified | +| --- | --- | +| Normal append | Records routed to correct stream, deserialise back to `Envelope` | +| Ring buffer overflow (dropped) | Writer logs warning and skips; no crash | +| `Append` error | `publishFn` receives exactly one `system_durable_error` event | +| Context cancelled | `Run` returns `nil` (clean shutdown) | +| `EventsStorageError` skipped | Error events not re-submitted, preventing feedback loops | +| Marshal failure (oversized) | Writer skips and continues; next event is processed normally | + +--- \ No newline at end of file From 676dbd72c75bf1172a25559612e476968e0b2eee Mon Sep 17 00:00:00 2001 From: Archan Datta Date: Mon, 4 May 2026 17:39:13 +0000 Subject: [PATCH 2/2] update s2 storage plan --- plans/s2-storage.md | 96 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 91 insertions(+), 5 deletions(-) diff --git a/plans/s2-storage.md b/plans/s2-storage.md index 0a205440..e51743d0 100644 --- a/plans/s2-storage.md +++ b/plans/s2-storage.md @@ -82,12 +82,12 @@ Producers are created on first `Append` for a given stream name and cached until ### 3. Batching: 100ms linger / 50 records (S2 backend) -The S2 SDK batcher coalesces records before flushing to the network. Configuration: +The S2 SDK batcher coalesces records before flushing to the network. Both values are configurable via environment variables with the defaults below: -``` -Linger: 100ms -MaxRecords: 50 -``` +| Env var | Default | Description | +| --- | --- | --- | +| `S2_BATCHER_LINGER_MS` | `100` | Flush delay in milliseconds | +| `S2_BATCHER_MAX_RECORDS` | `50` | Max records per batch | These are independent of the ring buffer read loop — the writer appends one record per ring Read, and the batcher decides when to flush. @@ -113,6 +113,92 @@ Shutdown must be strictly ordered to avoid writing to a closed `FileWriter`: --- +## API Surface Changes + +The storage writer runs as a background goroutine and is **not part of the OpenAPI surface**. Only two existing endpoints change behaviour; all others are unaffected. + +| Endpoint | Service | Change | +| --- | --- | --- | +| `POST /events/stop` | image server | After stopping, call `s2storage.Remove(captureSessionID)` to evict the producer. No request/response change. Requires a new `CaptureSession.CurrentID() string` accessor. | +| `POST /events/capture_session` | kernel server | After the image server returns a capture session ID, the kernel server writes that ID to `sessions.s2_stream`. No request/response change to the image server endpoint itself. | + +--- + +## Wiring into `main.go` + +Three additions to `main.go`: + +**1. Conditional construction**: S2 is enabled when `S2_BASIN` and `S2_TOKEN` are both present. If either is empty the storage writer is not started and the server behaves exactly as before. + +``` +config.S2Basin != "" && config.S2Token != "" → build s2storage + EventsStorageWriter +otherwise → no-op (nil storageWriter) +``` + +**2. Goroutine launch**: immediately after the HTTP servers are started: + +```go +storageDone := make(chan struct{}) +go func() { + defer close(storageDone) + storageWriter.Run(ctx) // blocks until ctx cancelled +}() +``` + +**3. Shutdown ordering**: the existing `errgroup` in main already waits for `apiService.Shutdown`. The storage writer must drain before that: + +``` +ctx cancelled + → storageWriter.Run returns (ring reader unblocks) + → storageDone closes + → storageWriter.Close() (drains in-flight S2 acks) + → apiService.Shutdown() (closes CaptureSession + FileWriter) +``` + +This is implemented by waiting on `storageDone` before calling `apiService.Shutdown`, outside the errgroup. + +--- + +## Credentials and Configuration + +**In the image server (**`config.go`**):** + +```go +S2Basin string `envconfig:"S2_BASIN" default:""` +S2Token string `envconfig:"S2_TOKEN" default:""` +S2BatcherLingerMs int `envconfig:"S2_BATCHER_LINGER_MS" default:"100"` +S2BatcherMaxRecs int `envconfig:"S2_BATCHER_MAX_RECORDS" default:"50"` +``` + +**In the VM (infra):** + +| Vault variable | Purpose | +| --- | --- | +| `vault_s2_events_basin_production` | S2 basin for browser event capture (prod) | +| `vault_s2_events_basin_staging` | S2 basin for browser event capture (staging) | +| `vault_s2_events_token_production` | Access token for the above basin (prod) | +| `vault_s2_events_token_staging` | Access token for the above basin (staging) | + +--- + +## Session Discoverability + +### New column: `s2_stream` on `Sessions` + +The capture session ID (= S2 stream name) must survive after the image server container is destroyed so that callers can replay events later. The kernel server stores it on the `Sessions` row at capture session start. + +**Schema change** (mirrors `replay_prefix` in `kernel/packages/api/ent/schema/session.go`): + +```go +field.String("s2_stream").Optional() +``` + +**Write path** — after `POST /events/capture_session` succeeds, the kernel server sets `s2_stream` to the returned capture session ID on the active `Sessions` row. + +**Read path** — to replay a session, query `sessions.s2_stream` for the session ID, then read the S2 stream by that name from seq 0. + +--- + ## Testing `EventsStorageWriter` is tested exclusively through `mockBackend` (defined in `eventsstorage_writer_test.go`). Test cases cover: