Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,47 @@
# Changelog

All notable changes to CKB will be documented in this file.

## [Unreleased]

### Added

- **`explainFile` surfaces semantically-related symbols** via LIP v2.1's
`stream_context` RPC (`internal/query/lip_stream_context.go`). The daemon
ranks symbols across the whole file within a 2048-token budget; CKB
returns the top 10 in the new `facts.related` field with per-symbol
relevance and token cost. Gated on the handshake's `supported_messages`
— older daemons fall through and the field is absent. New streaming
transport `internal/lip/stream_context.go` reads the daemon's N
`symbol_info` frames plus the `end_stream` terminator; previous LIP
client was unary-only.
- **`searchSymbols` expands short queries** via LIP's `query_expansion`
RPC (`internal/query/query_expansion.go`). Queries of ≤ 2 tokens get up
to 5 related terms appended before hitting FTS5, recovering recall on
vocabulary-mismatch misses ("auth" → "authenticate authorization
principal…"). Gated on the handshake and on the same mixed-models flag
that protects the rerank path. Longer queries are passed through
unchanged — the expansion is a rescue, not a rewrite.
- **Semantic hits carry evidence chunks** when LIP v2.0+'s `explain_match`
is advertised (`SemanticSearchWithLIPExplained` in
`internal/query/lip_ranker.go`). Each hit returned by the semantic
fallback path now includes up to two ranked chunks with line ranges,
text, and per-chunk scores — the caller can cite specific lines instead
of a bare file URL. Capped at the top-5 hits to bound round-trip cost.
- **`lip.Handshake` runs on engine startup** and the daemon's
`supported_messages` list is stashed for feature gating
(`Engine.lipSupports`). The daemon version and supported-count are
logged on connect.

### Changed

- **`lipFileURI` path normalisation** — the helper that builds
`file://`-URIs for LIP requests used to naive-`filepath.Join` whatever
`Location.FileId` a backend supplied. Now handles absolute paths and
already-prefixed `file://` URIs without producing malformed results
like `file:///repo//abs/path`. Backends today return relative paths, so
this is a hardening fix for contracts that are nominally open.

### Changed

- **LIP health: push-driven, not polled** — the Engine now opens a long-lived
Expand Down
15 changes: 12 additions & 3 deletions internal/lip/client.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Package lip provides a best-effort client for the LIP (Liz Indexing Protocol)
// local socket. All operations degrade silently when LIP is not running —
// callers must never treat LIP unavailability as a fatal error.
Expand Down Expand Up @@ -41,6 +41,10 @@
type HandshakeInfo struct {
DaemonVersion string `json:"daemon_version"`
ProtocolVersion int `json:"protocol_version"`
// SupportedMessages is the snake_case `type` tag list the daemon
// understands. Empty when talking to a pre-v1.5 daemon that omits the
// field — callers should fall back to ProtocolVersion comparisons.
SupportedMessages []string `json:"supported_messages"`
}

// IndexStatusInfo is the public view of LIP index health.
Expand Down Expand Up @@ -158,8 +162,9 @@
}

type handshakeResp struct {
DaemonVersion string `json:"daemon_version"`
ProtocolVersion int `json:"protocol_version"`
DaemonVersion string `json:"daemon_version"`
ProtocolVersion int `json:"protocol_version"`
SupportedMessages []string `json:"supported_messages"`
}

type similarityResp struct {
Expand Down Expand Up @@ -387,7 +392,7 @@

// FindSemanticCounterpart finds the top-k candidates most semantically similar
// to the source URI. Useful for finding test files that cover an implementation.
func FindSemanticCounterpart(uri string, candidates []string, topK int) ([]NearestResult, error) {

Check notice on line 395 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function FindSemanticCounterpart (complexity: 2) [ckb/test-gaps/no-name-match]
result, _ := lipRPC(
map[string]any{"type": "find_semantic_counterpart", "uri": uri, "candidates": candidates, "top_k": topK},
500*time.Millisecond, 4<<20,
Expand All @@ -400,7 +405,7 @@

// NearestInStore performs nearest-neighbour against a caller-provided embedding
// map (keyed by URI). Enables cross-repo federation via ExportEmbeddings.
func NearestInStore(uri string, store map[string][]float32, topK int) ([]NearestResult, error) {

Check notice on line 408 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function NearestInStore (complexity: 2) [ckb/test-gaps/no-name-match]
result, _ := lipRPC(
map[string]any{"type": "query_nearest_in_store", "uri": uri, "store": store, "top_k": topK},
2*time.Second, 32<<20,
Expand Down Expand Up @@ -530,7 +535,7 @@
return lipRPC(
map[string]any{"type": "query_coverage", "root": root},
500*time.Millisecond, 1<<20,
func(r coverageResp) *CoverageInfo {

Check notice on line 538 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function <anonymous> (complexity: 2) [ckb/test-gaps/no-name-match]
cov := float32(0)
if r.CoverageFraction != nil {
cov = *r.CoverageFraction
Expand Down Expand Up @@ -664,7 +669,7 @@
return lipRPC(
map[string]any{"type": "query_index_status"},
200*time.Millisecond, 4<<10,
func(r indexStatusResp) *IndexStatusInfo {

Check notice on line 672 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function <anonymous> (complexity: 2) [ckb/test-gaps/no-name-match]
lastUpdated := ""
if r.LastUpdatedMs != nil {
lastUpdated = time.UnixMilli(*r.LastUpdatedMs).UTC().Format(time.RFC3339)
Expand All @@ -685,7 +690,7 @@
return lipRPC(
map[string]any{"type": "query_file_status", "uri": uri},
200*time.Millisecond, 4<<10,
func(r fileStatusResp) *FileStatusInfo {

Check notice on line 693 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function <anonymous> (complexity: 1) [ckb/test-gaps/no-name-match]
return &FileStatusInfo{
Indexed: r.Indexed,
AgeSeconds: r.AgeSeconds,
Expand All @@ -710,7 +715,7 @@

// ReindexFiles forces a re-index of specific file URIs from disk. Fire-and-forget:
// any LIP error is silently discarded.
func ReindexFiles(uris []string) {

Check notice on line 718 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function ReindexFiles (complexity: 2) [ckb/test-gaps/no-name-match]
if len(uris) == 0 {
return
}
Expand Down Expand Up @@ -762,7 +767,7 @@
result, _ := lipRPC(
map[string]any{"type": "batch_annotation_get", "uris": uris, "key": key},
timeout, 4<<20,
func(r batchAnnotationResp) *map[string]string {

Check notice on line 770 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function <anonymous> (complexity: 4) [ckb/test-gaps/no-name-match]
out := make(map[string]string, len(r.Entries))
for k, v := range r.Entries {
if v != nil {
Expand Down Expand Up @@ -790,7 +795,11 @@
}
return lipRPC(req, 200*time.Millisecond, 4<<10,
func(r handshakeResp) *HandshakeInfo {
return &HandshakeInfo{DaemonVersion: r.DaemonVersion, ProtocolVersion: r.ProtocolVersion}
return &HandshakeInfo{
DaemonVersion: r.DaemonVersion,
ProtocolVersion: r.ProtocolVersion,
SupportedMessages: r.SupportedMessages,
}
})
}

Expand All @@ -801,7 +810,7 @@
// lipRPC is the shared transport for request→response LIP calls.
// T is the JSON response type; U is the public return type.
// Returns (nil, nil) on any error — callers treat nil as "LIP unavailable".
func lipRPC[T any, U any](req any, timeout time.Duration, maxRespBytes uint32, convert func(T) *U) (*U, error) {

Check notice on line 813 in internal/lip/client.go

View workflow job for this annotation

GitHub Actions / PR Review

Untested function lipRPC (complexity: 7) [ckb/test-gaps/no-name-match]
conn, err := net.DialTimeout("unix", SocketPath(), 100*time.Millisecond)
if err != nil {
return nil, nil
Expand Down
148 changes: 148 additions & 0 deletions internal/lip/stream_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package lip

import (
"encoding/json"
"io"
"net"
"time"
)

// StreamContextSymbol is one frame of a StreamContext response. The
// embedded `OwnedSymbolInfo` is flattened into the fields we actually
// consume in CKB — the full Rust struct carries many fields we don't
// need (telemetry, relationships, taint) and serialising them through
// `map[string]any` would be wasteful.
type StreamContextSymbol struct {
URI string `json:"uri"`
DisplayName string `json:"display_name"`
Kind string `json:"kind"`
RelevanceScore float32 `json:"relevance_score"`
TokenCost uint32 `json:"token_cost"`
}

// StreamContextResult summarises a completed StreamContext stream.
// `Reason` is one of "token_budget" | "exhausted" | "error".
type StreamContextResult struct {
Symbols []StreamContextSymbol
Reason string
Emitted uint32
TotalCandidates uint32
Err string
}

// StreamContextPosition is the cursor rectangle the daemon ranks around.
// Byte-offset semantics match LIP's `OwnedRange` — 0-based lines and
// chars. Pass a zero-width range at the cursor, or a whole-file range
// (`start_line=0, end_line=lineCount`) for file-level context.
type StreamContextPosition struct {
StartLine int `json:"start_line"`
StartChar int `json:"start_char"`
EndLine int `json:"end_line"`
EndChar int `json:"end_char"`
}

// streamContextMaxFrames caps how many SymbolInfo frames we accept before
// bailing out — defence against a runaway daemon. Large indexes could
// theoretically produce 10k+ candidates; a hard cap of 1024 is well above
// any realistic caller budget and cheap to enforce.
const streamContextMaxFrames = 1024

// StreamContext opens a dedicated connection, sends a `stream_context`
// request, and reads SymbolInfo frames until the daemon writes the
// `end_stream` terminator. Returns (nil, nil) when the daemon is
// unavailable — callers must treat nil as "LIP unavailable" (same contract
// as the rest of the package).
//
// The dedicated connection is intentional: `stream_context` on the
// shared subscriber channel would interleave with IndexStatus pings and
// IndexChanged pushes and complicate parsing. One connection per call is
// fine — the ranking itself dominates latency, and callers shouldn't
// issue this RPC more than a few times per second.
func StreamContext(fileURI string, pos StreamContextPosition, maxTokens uint32, model string) (*StreamContextResult, error) {
conn, err := net.DialTimeout("unix", SocketPath(), 500*time.Millisecond)
if err != nil {
return nil, nil
}
defer conn.Close()
// Overall deadline: the daemon's relevance ranking is heuristic and
// bounded, but pathological inputs could stall. 10 s is generous; for
// a token_budget of ~2000 it completes in ~200 ms typically.
_ = conn.SetDeadline(time.Now().Add(10 * time.Second))

req := map[string]any{
"type": "stream_context",
"file_uri": fileURI,
"cursor_position": pos,
"max_tokens": maxTokens,
}
if model != "" {
req["model"] = model
}
if err := writeFrame(conn, req); err != nil {
return nil, nil
}

out := &StreamContextResult{Symbols: make([]StreamContextSymbol, 0, 16)}
for range streamContextMaxFrames + 1 {
frame, err := readFrame(conn)
if err != nil {
if err == io.EOF {
return out, nil
}
return nil, nil
}
// ServerResponse { ok: ServerMessage, error: Option<String> }
inner := frame
if raw, ok := frame["ok"]; ok && len(raw) > 0 && string(raw) != "null" {
_ = json.Unmarshal(raw, &inner)
}
var kind string
_ = json.Unmarshal(inner["type"], &kind)

switch kind {
case "symbol_info":
var sym struct {
SymbolInfo struct {
URI string `json:"uri"`
DisplayName string `json:"display_name"`
Kind string `json:"kind"`
} `json:"symbol_info"`
RelevanceScore float32 `json:"relevance_score"`
TokenCost uint32 `json:"token_cost"`
}
if b, ok := marshalInner(inner); ok {
_ = json.Unmarshal(b, &sym)
}
out.Symbols = append(out.Symbols, StreamContextSymbol{
URI: sym.SymbolInfo.URI,
DisplayName: sym.SymbolInfo.DisplayName,
Kind: sym.SymbolInfo.Kind,
RelevanceScore: sym.RelevanceScore,
TokenCost: sym.TokenCost,
})
case "end_stream":
var end struct {
Reason string `json:"reason"`
Emitted uint32 `json:"emitted"`
TotalCandidates uint32 `json:"total_candidates"`
Error *string `json:"error"`
}
if b, ok := marshalInner(inner); ok {
_ = json.Unmarshal(b, &end)
}
out.Reason = end.Reason
out.Emitted = end.Emitted
out.TotalCandidates = end.TotalCandidates
if end.Error != nil {
out.Err = *end.Error
}
return out, nil
case "error", "unknown_message":
// Daemon rejected the request — treat as unavailable.
return nil, nil
default:
// Unknown frame type mid-stream: skip rather than fail hard.
}
}
return out, nil
}
143 changes: 143 additions & 0 deletions internal/lip/stream_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package lip

import (
"encoding/binary"
"encoding/json"
"io"
"net"
"os"
"path/filepath"
"testing"
"time"
)

// startStreamContextDaemon spins up a Unix socket that responds to any
// request with `frames` in order and then closes. Tests inject the full
// frame sequence they want to exercise — the fake is dumb so behaviour
// under malformed input is exercised by the real daemon's tests, not
// CKB's.
func startStreamContextDaemon(t *testing.T, frames []map[string]any) {
t.Helper()
dir, err := os.MkdirTemp("/tmp", "lipstream")
if err != nil {
t.Fatalf("mkdirtemp: %v", err)
}
sock := filepath.Join(dir, "s.sock")
ln, err := net.Listen("unix", sock)
if err != nil {
os.RemoveAll(dir)
t.Fatalf("listen: %v", err)
}
prev := os.Getenv("LIP_SOCKET")
os.Setenv("LIP_SOCKET", sock)
t.Cleanup(func() {
ln.Close()
os.RemoveAll(dir)
os.Setenv("LIP_SOCKET", prev)
})

go func() {
conn, err := ln.Accept()
if err != nil {
return
}
defer conn.Close()
// Drain the incoming stream_context request.
_ = conn.SetReadDeadline(time.Now().Add(2 * time.Second))
var lenBuf [4]byte
if _, err := io.ReadFull(conn, lenBuf[:]); err != nil {
return
}
reqLen := binary.BigEndian.Uint32(lenBuf[:])
if _, err := io.CopyN(io.Discard, conn, int64(reqLen)); err != nil {
return
}
for _, f := range frames {
b, _ := json.Marshal(f)
var out [4]byte
binary.BigEndian.PutUint32(out[:], uint32(len(b)))
if _, err := conn.Write(out[:]); err != nil {
return
}
if _, err := conn.Write(b); err != nil {
return
}
}
}()
}

func TestStreamContext_ParsesSymbolInfoThenEndStream(t *testing.T) {
startStreamContextDaemon(t, []map[string]any{
{
"type": "symbol_info",
"symbol_info": map[string]any{
"uri": "file:///repo/foo.go",
"display_name": "Foo",
"kind": "function",
},
"relevance_score": 0.8,
"token_cost": 120,
},
{
"type": "symbol_info",
"symbol_info": map[string]any{
"uri": "file:///repo/bar.go",
"display_name": "Bar",
"kind": "struct",
},
"relevance_score": 0.6,
"token_cost": 80,
},
{
"type": "end_stream",
"reason": "token_budget",
"emitted": 2,
"total_candidates": 17,
},
})

res, err := StreamContext("file:///repo/foo.go", StreamContextPosition{EndLine: 100}, 1024, "")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if res == nil {
t.Fatal("nil result, want 2 symbols")
}
if len(res.Symbols) != 2 {
t.Fatalf("symbols = %d, want 2", len(res.Symbols))
}
if res.Symbols[0].DisplayName != "Foo" || res.Symbols[0].RelevanceScore != 0.8 {
t.Errorf("symbol[0] = %+v", res.Symbols[0])
}
if res.Reason != "token_budget" || res.Emitted != 2 || res.TotalCandidates != 17 {
t.Errorf("terminator mismatch: %+v", res)
}
}

func TestStreamContext_DaemonUnavailableReturnsNil(t *testing.T) {
prev := os.Getenv("LIP_SOCKET")
os.Setenv("LIP_SOCKET", "/tmp/ckb-lip-stream-nonexistent.sock")
t.Cleanup(func() { os.Setenv("LIP_SOCKET", prev) })

res, err := StreamContext("file:///repo/foo.go", StreamContextPosition{}, 1024, "")
if err != nil {
t.Fatalf("err = %v, want nil (silent degradation contract)", err)
}
if res != nil {
t.Fatalf("res = %+v, want nil", res)
}
}

func TestStreamContext_ErrorFrameAborts(t *testing.T) {
startStreamContextDaemon(t, []map[string]any{
{
"type": "error",
"message": "cursor out of range",
"code": "cursor_out_of_range",
},
})
res, _ := StreamContext("file:///repo/foo.go", StreamContextPosition{EndLine: 9999}, 1024, "")
if res != nil {
t.Fatalf("res = %+v, want nil on error frame", res)
}
}
7 changes: 7 additions & 0 deletions internal/query/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,17 @@ type Engine struct {
// connection open and receives `index_changed` pushes plus per-ping health
// snapshots. `lipHealthCheckedAt` is zero until the first frame arrives —
// callers check it before trusting the flags.
//
// `lipSupported` is the set of `type` tags the daemon advertised in its
// handshake. It gates calls to newer RPCs (StreamContext, ExplainMatch,
// ...) on clients talking to an older daemon, instead of letting them
// dispatch and get back an UnknownMessage. Empty when the handshake has
// not yet completed or the daemon predates `supported_messages`.
lipHealthMu sync.RWMutex
cachedLipMixed bool
cachedLipAvailable bool
lipHealthCheckedAt time.Time
lipSupported map[string]struct{}
lipSubCancel context.CancelFunc

// Cache stats
Expand Down
Loading
Loading