Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions core/services/nodes/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,21 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
}
}

// Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates
loadModel := func() (*RouteResult, error) {
// Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates.
//
// Detach the cold-load from the caller's context. Staging a model can
// transfer multiple GB to a worker, which takes far longer than any client
// keeps its HTTP request open — a browser refresh, an ingress/LB idle
// timeout, or a round-robined retry landing on another replica all cancel
// the request context. If staging were bound to it, the multi-GB upload
// aborts with "context canceled" mid-transfer and large models can never
// finish staging (the model-load outage). WithoutCancel keeps the request's
// values (prefix chain, etc.) but drops its cancellation/deadline. Each
// long step still has its own bound (the file stager's resume budget,
// LoadModel's 5m timeout), and the per-model advisory lock below de-dupes
// concurrent loaders across replicas.
loadCtx := context.WithoutCancel(ctx)
loadModel := func(ctx context.Context) (*RouteResult, error) {
// Re-check after acquiring lock — another request may have loaded it
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref)
if err == nil && node != nil {
Expand Down Expand Up @@ -433,9 +446,9 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
if r.db != nil {
lockKey := advisorylock.KeyFromString("model-load:" + trackingKey)
var result *RouteResult
lockErr := advisorylock.WithLockCtx(ctx, r.db, lockKey, func() error {
lockErr := advisorylock.WithLockCtx(loadCtx, r.db, lockKey, func() error {
var err error
result, err = loadModel()
result, err = loadModel(loadCtx)
return err
})
if lockErr != nil {
Expand All @@ -444,7 +457,7 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
return result, nil
}
// No DB (non-distributed) — proceed without lock
return loadModel()
return loadModel(loadCtx)
}

// parseSelectorJSON decodes a JSON node selector string into a map.
Expand Down
80 changes: 80 additions & 0 deletions core/services/nodes/router_staging_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package nodes

import (
"context"
"errors"
"os"
"path/filepath"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/mudler/LocalAI/core/services/messaging"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
)

// cancelOnStageStager simulates the triggering HTTP request being abandoned
// (client disconnect, ingress idle-timeout) the moment a multi-GB file starts
// staging. It cancels the request context and records whether the context the
// stager itself received was cancelled as a result.
type cancelOnStageStager struct {
fakeFileStager
cancelRequest context.CancelFunc
staged bool
ctxErrOnStage error
}

func (s *cancelOnStageStager) EnsureRemote(ctx context.Context, _, _, key string) (string, error) {
s.staged = true
// Mid-transfer: the client gives up on the (minutes-long) request.
if s.cancelRequest != nil {
s.cancelRequest()
}
// A multi-GB upload must survive this. If staging were bound to the
// request context, ctx is now cancelled and the real HTTP stager would
// abort with "context canceled" — exactly the production outage.
s.ctxErrOnStage = ctx.Err()
return "/remote/" + key, nil
}

var _ = Describe("Route cold-load staging context", func() {
It("detaches staging from the request context so a client disconnect cannot abort a multi-GB transfer", func() {
// A real model file so stageModelFiles actually calls the stager
// (non-existent paths are skipped).
tmp := GinkgoT().TempDir()
modelFile := filepath.Join(tmp, "big.gguf")
Expect(os.WriteFile(modelFile, []byte("weights"), 0o644)).To(Succeed())

reg := &fakeModelRouter{
findAndLockErr: errors.New("not loaded"),
findIdleNode: &BackendNode{ID: "n1", Name: "worker-1", Address: "10.0.0.1:50051"},
}
backend := &stubBackend{loadResult: &pb.Result{Success: true}}
factory := &stubClientFactory{client: backend}
unloader := &fakeUnloader{installReply: &messaging.BackendInstallReply{
Success: true,
Address: "10.0.0.1:9001",
}}
stager := &cancelOnStageStager{}

router := NewSmartRouter(reg, SmartRouterOptions{
Unloader: unloader,
ClientFactory: factory,
FileStager: stager,
// DB nil: no advisory lock, exercises the same detached load ctx.
})

ctx, cancel := context.WithCancel(context.Background())
stager.cancelRequest = cancel
defer cancel()

result, err := router.Route(ctx, "big-model", filepath.Join("models", "big.gguf"), "llama-cpp",
&pb.ModelOptions{Model: "big.gguf", ModelFile: modelFile}, false)

Expect(err).ToNot(HaveOccurred())
Expect(result).ToNot(BeNil())
Expect(stager.staged).To(BeTrue(), "staging must have been attempted")
Expect(stager.ctxErrOnStage).ToNot(HaveOccurred(),
"staging context must survive cancellation of the triggering request")
})
})
Loading