Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/secscan.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ jobs:
steps:
- name: Checkout Source
uses: actions/checkout@v6
if: ${{ github.actor != 'dependabot[bot]' }}
if: ${{ !github.repository.fork && github.actor != 'dependabot[bot]' }}
- name: Run Gosec Security Scanner
if: ${{ github.actor != 'dependabot[bot]' }}
if: ${{ !github.repository.fork && github.actor != 'dependabot[bot]' }}
uses: securego/gosec@v2.27.1
with:
# we let the report trigger content trigger a failure using the GitHub Security features.
args: '-no-fail -fmt sarif -out results.sarif ./...'
- name: Upload SARIF file
if: ${{ github.actor != 'dependabot[bot]' }}
if: ${{ !github.repository.fork && github.actor != 'dependabot[bot]' }}
uses: github/codeql-action/upload-sarif@v4
with:
# Path to SARIF file relative to the root of the repository
Expand Down
2 changes: 1 addition & 1 deletion core/http/endpoints/localai/backend_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func BackendMonitorEndpoint(bm *monitoring.BackendMonitorService) echo.HandlerFu
return echo.NewHTTPError(400, "model query parameter is required")
}

resp, err := bm.CheckAndSample(model)
resp, err := bm.CheckAndSample(c.Request().Context(), model)
if err != nil {
return err
}
Expand Down
30 changes: 30 additions & 0 deletions core/http/endpoints/localai/config_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,36 @@ func AutocompleteEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, a
}
}

// GetConfigEndpoint returns the YAML + JSON view for an installed model.
// Used by the MCP httpapi.Client for get_model_config, and by the React
// model editor when it wants a clean disk-read view (not the in-memory
// loader copy which has SetDefaults applied).
// @Summary Read a model configuration from disk
// @Description Returns the raw YAML and parsed JSON view of an installed model's config file
// @Tags config
// @Produce json
// @Param name path string true "Model name"
// @Success 200 {object} map[string]any "{name, yaml, json}"
// @Router /api/models/config-yaml/{name} [get]
func GetConfigEndpoint(cl *config.ModelConfigLoader, appConfig *config.ApplicationConfig) echo.HandlerFunc {
svc := modeladmin.NewConfigService(cl, appConfig)
return func(c echo.Context) error {
modelName := c.Param("name")
if decoded, err := url.PathUnescape(modelName); err == nil {
modelName = decoded
}
view, err := svc.GetConfig(c.Request().Context(), modelName)
if err != nil {
return c.JSON(httpStatusForModelAdminError(err), map[string]any{"error": err.Error()})
}
return c.JSON(http.StatusOK, map[string]any{
"name": view.Name,
"yaml": view.YAML,
"json": view.JSON,
})
}
}

// PatchConfigEndpoint handles PATCH requests to partially update a model config
// using nested JSON merge.
// @Summary Partially update a model configuration
Expand Down
2 changes: 1 addition & 1 deletion core/http/endpoints/localai/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func LocalAIMetricsAPIMiddleware(metrics *monitoring.LocalAIMetricsService) echo
start := time.Now()
err := next(c)
elapsed := float64(time.Since(start)) / float64(time.Second)
cfg.metricsService.ObserveAPICall(method, path, elapsed)
cfg.metricsService.ObserveAPICall(c.Request().Context(), method, path, elapsed)
return err
}
}
Expand Down
154 changes: 151 additions & 3 deletions core/http/endpoints/openai/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@ package openai

import (
"context"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"math"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -252,16 +256,137 @@ var upgrader = websocket.Upgrader{
},
}

// TODO: Implement ephemeral keys to allow these endpoints to be used
// ephemeralSessionKeyTTL is the lifetime of a short-lived session token
// issued by POST /v1/realtime/sessions. These are consumed exactly once by
// the WebSocket handshake to /v1/realtime and allow clients that hold a
// regular API key at session-creation time to open a WebSocket without
// re-sending it on every frame — matching the OpenAI realtime API shape.
const ephemeralSessionKeyTTL = 60 * time.Second

// ephemeralSessionKey combines a 32-byte random payload + the expiry time
// (Unix seconds) signed with HMAC-SHA256 using the application's API key
// HMAC secret. Returns "lai-sess:<base62 payload>:<expiry>:<hex sig>".
func generateEphemeralSessionKey(hmacSecret, userID string) (string, time.Time, error) {
b := make([]byte, 32)
if _, err := rand.Read(b); err != nil {
return "", time.Time{}, fmt.Errorf("failed to generate session key: %w", err)
}
expiry := time.Now().UTC().Add(ephemeralSessionKeyTTL)
expiryStr := strconv.FormatInt(expiry.Unix(), 10)
payload := hex.EncodeToString(b) + ":" + expiryStr + ":" + userID
h := hmac.New(sha256.New, []byte(hmacSecret))
h.Write([]byte(payload))
sig := hex.EncodeToString(h.Sum(nil))
return "lai-sess:" + payload + ":" + sig, expiry, nil
}

// validateEphemeralSessionKey verifies the HMAC signature and expiry of a
// token produced by generateEphemeralSessionKey. Returns the embedded
// userID (may be empty for anonymous sessions) on success, or an error.
func validateEphemeralSessionKey(token, hmacSecret string) (string, error) {
if !strings.HasPrefix(token, "lai-sess:") {
return "", errors.New("invalid ephemeral session key: missing prefix")
}
rest := strings.TrimPrefix(token, "lai-sess:")
// rest = "payload_hex:expiry_unix:userID:signature" — the signature is
// always the last segment; everything before it is the signed payload.
lastColon := strings.LastIndex(rest, ":")
if lastColon == -1 {
return "", errors.New("invalid ephemeral session key: bad format")
}
payload, sig := rest[:lastColon], rest[lastColon+1:]
h := hmac.New(sha256.New, []byte(hmacSecret))
h.Write([]byte(payload))
expected := hex.EncodeToString(h.Sum(nil))
if !hmac.Equal([]byte(sig), []byte(expected)) {
return "", errors.New("invalid ephemeral session key: bad signature")
}
parts := strings.SplitN(payload, ":", 3) // payload_hex:expiry_unix:userID(+rest)
if len(parts) < 2 {
return "", errors.New("invalid ephemeral session key: bad format")
}
expiryUnix, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return "", errors.New("invalid ephemeral session key: bad expiry")
}
if time.Now().UTC().Unix() > expiryUnix {
return "", errors.New("ephemeral session key expired")
}
userID := ""
if len(parts) >= 3 {
userID = parts[2]
}
return userID, nil
}

// RealtimeSessions handles POST /v1/realtime/sessions. Generates a
// short-lived ephemeral token that is consumed by the /v1/realtime
// WebSocket handshake. When auth is disabled, the endpoint still issues a
// token (for compatibility) but does not require credentials.
func RealtimeSessions(application *application.Application) echo.HandlerFunc {
return func(c echo.Context) error {
return c.NoContent(501)
// When auth is enabled, the caller must authenticate with a
// regular API key or session cookie at session-creation time.
appCfg := application.ApplicationConfig()
userID := ""
if appCfg != nil && appCfg.Auth.Enabled {
if u := auth.GetUser(c); u != nil {
userID = u.ID
}
}
hmacSecret := ""
if appCfg != nil {
hmacSecret = appCfg.Auth.APIKeyHMACSecret
}

token, expiresAt, err := generateEphemeralSessionKey(hmacSecret, userID)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
}

// Response shape is a subset of the OpenAI Realtime Session object.
return c.JSON(http.StatusOK, map[string]any{
"object": "realtime.session",
"id": "sess_" + hex.EncodeToString([]byte(fmt.Sprintf("%d", expiresAt.UnixNano())))[:8],
"model": "gpt-4o-realtime-preview", // placeholder — actual model is per-connection
"ephemeral_token": token,
"expires_at": expiresAt.UTC().Format(time.RFC3339),
"seconds_left": int64(time.Until(expiresAt).Seconds()),
"max_audio_additions": map[string]any{
"total_tokens": 120000,
"input_tokens": 60000,
"output_tokens": 60000,
},
})
}
}

// RealtimeTranscriptionSession handles POST /v1/realtime/transcriptions — a
// transcription-only variant of the session endpoint. Shares the same
// ephemeral-session-key format so the same validator works for both.
func RealtimeTranscriptionSession(application *application.Application) echo.HandlerFunc {
return func(c echo.Context) error {
return c.NoContent(501)
appCfg := application.ApplicationConfig()
userID := ""
if appCfg != nil && appCfg.Auth.Enabled {
if u := auth.GetUser(c); u != nil {
userID = u.ID
}
}
hmacSecret := ""
if appCfg != nil {
hmacSecret = appCfg.Auth.APIKeyHMACSecret
}
token, expiresAt, err := generateEphemeralSessionKey(hmacSecret, userID)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]string{"error": err.Error()})
}
return c.JSON(http.StatusOK, map[string]any{
"object": "realtime.transcription_session",
"ephemeral_token": token,
"expires_at": expiresAt.UTC().Format(time.RFC3339),
"seconds_left": int64(time.Until(expiresAt).Seconds()),
})
}
}

Expand All @@ -280,6 +405,29 @@ type RealtimeSessionOptions struct {

func Realtime(application *application.Application) echo.HandlerFunc {
return func(c echo.Context) error {
// Ephemeral session key validation. When auth is enabled, the
// client must pass a valid token (from POST /v1/realtime/sessions)
// either via `?session=` query parameter or via the Authorization
// header as `Bearer <token>`. The standard `isCurrentUserAdmin`
// path is honored when auth is disabled.
appCfg := application.ApplicationConfig()
if appCfg != nil && appCfg.Auth.Enabled {
token := c.QueryParam("session")
if token == "" {
// Fall back to Authorization: Bearer <lai-sess:...>
ah := c.Request().Header.Get("Authorization")
if strings.HasPrefix(ah, "Bearer ") {
token = strings.TrimPrefix(ah, "Bearer ")
}
}
if token == "" {
return echo.NewHTTPError(http.StatusUnauthorized, "missing ephemeral session key — call POST /v1/realtime/sessions first")
}
if _, err := validateEphemeralSessionKey(token, appCfg.Auth.APIKeyHMACSecret); err != nil {
return echo.NewHTTPError(http.StatusUnauthorized, fmt.Sprintf("invalid ephemeral session key: %s", err.Error()))
}
}

ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
return err
Expand Down
68 changes: 37 additions & 31 deletions core/http/endpoints/openai/realtime_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,39 +486,22 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model
return nil, fmt.Errorf("failed to validate config: %w", err)
}

// TODO: Do we always need a transcription model? It can be disabled. Note that any-to-any instruction following models don't transcribe as such, so if transcription is required it is a separate process
cfgSST, err := cl.LoadModelConfigFileByName(pipeline.Transcription, ml.ModelPath)
if err != nil {

return nil, fmt.Errorf("failed to load backend config: %w", err)
}

if valid, _ := cfgSST.Validate(); !valid {
return nil, fmt.Errorf("failed to validate config: %w", err)
// Transcription (SST) is optional. If the pipeline doesn't specify one
// (pipeline.Transcription == "") or the LLM is already any-to-any, we
// skip loading a separate transcription config and Transcribe/TranscribeStream
// will fall back to erroring or the any-to-any backend.
var cfgSST *config.ModelConfig
if pipeline.Transcription != "" {
c, err := cl.LoadModelConfigFileByName(pipeline.Transcription, ml.ModelPath)
if err != nil {
return nil, fmt.Errorf("failed to load transcription config: %w", err)
}
if valid, _ := c.Validate(); !valid {
return nil, fmt.Errorf("failed to validate transcription config: %w", err)
}
cfgSST = c
}

// TODO: Decide when we have a real any-to-any model
// if false {
//
// cfgAnyToAny, err := cl.LoadModelConfigFileByName(pipeline.LLM, ml.ModelPath)
// if err != nil {
//
// return nil, fmt.Errorf("failed to load backend config: %w", err)
// }
//
// if valid, _ := cfgAnyToAny.Validate(); !valid {
// return nil, fmt.Errorf("failed to validate config: %w", err)
// }
//
// return &anyToAnyModel{
// LLMConfig: cfgAnyToAny,
// VADConfig: cfgVAD,
// }, nil
// }

xlog.Debug("Loading a wrapped model")

// Otherwise we want to return a wrapped model, which is a "virtual" model that re-uses other models to perform operations
cfgLLM, err := cl.LoadModelConfigFileByName(pipeline.LLM, ml.ModelPath)
if err != nil {

Expand All @@ -529,11 +512,34 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model
return nil, fmt.Errorf("failed to validate config: %w", err)
}

// Any-to-any detection. If the LLM model declares FLAG_REALTIME_AUDIO
// (or one of the known any-to-any backends), skip the TTS + SST pipeline
// and route audio directly through the LLM.
isAnyToAny := false
if cfgLLM != nil {
isAnyToAny = cfgLLM.Backend == "liquid-audio" || cfgLLM.HasUsecases(config.FLAG_REALTIME_AUDIO)
}

// Let the pipeline set the LLM's reasoning effort and force thinking off
// (cfgLLM is a per-session copy). disable_thinking applies after the effort.
applyPipelineReasoning(cfgLLM, *pipeline)
applyPipelineThinking(cfgLLM, *pipeline)

if isAnyToAny {
xlog.Debug("Loading an any-to-any model (native AudioToAudioStream)")
return &anyToAnyModel{
LLMConfig: cfgLLM,
VADConfig: cfgVAD,

confLoader: cl,
modelLoader: ml,
appConfig: appConfig,
}, nil
}

xlog.Debug("Loading a wrapped model")

// Otherwise we want to return a wrapped model, which is a "virtual" model that re-uses other models to perform operations
cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath)
if err != nil {

Expand Down
4 changes: 4 additions & 0 deletions core/http/routes/localai.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func RegisterLocalAIRoutes(router *echo.Echo,
router.POST("/models/reload", localai.ReloadModelsEndpoint(cl, appConfig), adminMiddleware)
}

// JSON read-back of an installed model's YAML config (used by the
// standalone MCP server so it can call get_model_config over REST).
router.GET("/api/models/config-yaml/:name", localai.GetConfigEndpoint(cl, appConfig), adminMiddleware)

detectionHandler := localai.DetectionEndpoint(cl, ml, appConfig)
router.POST("/v1/detection",
detectionHandler,
Expand Down
16 changes: 16 additions & 0 deletions core/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,19 @@ func AddNode(serviceID string, node schema.NodeData) {
}
nodes[serviceID][node.ID] = node
}

// ReplaceNodes replaces the local view of nodes for a serviceID with the
// given snapshot. Used by the new discoveryTunnels to avoid accumulating
// stale entries.
func ReplaceNodes(serviceID string, nodesSlice []schema.NodeData) {
if serviceID == "" {
serviceID = defaultServicesID
}
mu.Lock()
defer mu.Unlock()
next := make(map[string]schema.NodeData, len(nodesSlice))
for _, nd := range nodesSlice {
next[nd.ID] = nd
}
nodes[serviceID] = next
}
Loading