diff --git a/core/application/distributed.go b/core/application/distributed.go index 00c39422d8b9..3235e4304f94 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -357,6 +357,15 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade Pressure: pressure, }) + // Wire staging-progress broadcasting so file-staging shows up on every + // replica, not just the one performing the transfer. Without this, a + // /api/operations poll that round-robins onto a peer sees no staging row and + // the progress flickers. The origin publishes; peers mirror via the wildcard. + router.StagingTracker().SetPublisher(natsClient) + if _, err := router.StagingTracker().SubscribeBroadcasts(natsClient); err != nil { + xlog.Warn("Failed to subscribe to staging progress broadcasts", "error", err) + } + // Create ReplicaReconciler for auto-scaling model replicas. Adapter + // RegistrationToken feed the state-reconciliation passes: pending op // drain uses the adapter, and model health probes use the token to auth diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 1bee20f4419c..7d099460c3f0 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -64,6 +64,22 @@ func SubjectGalleryProgress(opID string) string { return subjectGalleryPrefix + sanitizeSubjectToken(opID) + ".progress" } +// SubjectStagingProgress returns the NATS subject a frontend replica publishes +// file-staging progress on. Staging progress is otherwise per-process state +// (the SmartRouter's in-memory StagingTracker), so without this broadcast a +// /api/operations poll that round-robins onto a replica that did not originate +// the staging op sees nothing - the progress row flickers in multi-replica +// deployments. Peers subscribe to the wildcard and merge. +func SubjectStagingProgress(modelID string) string { + return subjectStagingPrefix + sanitizeSubjectToken(modelID) + ".progress" +} + +const subjectStagingPrefix = "staging." + +// SubjectStagingProgressWildcard matches every replica's staging-progress +// broadcasts so a peer can mirror staging ops it did not originate. +const SubjectStagingProgressWildcard = "staging.*.progress" + // SubjectGalleryOpStart and SubjectGalleryOpEnd are broadcast subjects for the // in-memory OpCache lifecycle. Frontend replicas publish to these when an // admin admits a new install/delete (Start) and when an operation is diff --git a/core/services/nodes/staging_progress.go b/core/services/nodes/staging_progress.go index 3d066c0fa774..0a6ddc50ed60 100644 --- a/core/services/nodes/staging_progress.go +++ b/core/services/nodes/staging_progress.go @@ -5,58 +5,138 @@ import ( "fmt" "sync" "time" + + "github.com/mudler/LocalAI/core/services/messaging" ) // StagingStatus represents the current progress of a model staging operation. type StagingStatus struct { - ModelID string `json:"model_id"` - NodeName string `json:"node_name"` - FileName string `json:"file_name"` - BytesSent int64 `json:"bytes_sent"` - TotalBytes int64 `json:"total_bytes"` - Progress float64 `json:"progress"` // 0-100 overall progress - Speed string `json:"speed"` - FileIndex int `json:"file_index"` - TotalFiles int `json:"total_files"` - Message string `json:"message"` + ModelID string `json:"model_id"` + NodeName string `json:"node_name"` + FileName string `json:"file_name"` + BytesSent int64 `json:"bytes_sent"` + TotalBytes int64 `json:"total_bytes"` + Progress float64 `json:"progress"` // 0-100 overall progress + Speed string `json:"speed"` + FileIndex int `json:"file_index"` + TotalFiles int `json:"total_files"` + Message string `json:"message"` StartedAt time.Time `json:"started_at"` } +const ( + // stagingBroadcastInterval bounds how often byte-level UpdateFile ticks are + // re-broadcast to peers (leading-edge debounce). State transitions (Start, + // FileComplete, Complete) always publish so peers never miss them. + stagingBroadcastInterval = time.Second + // stagingRemoteTTL drops a mirrored (remote) op whose last update is older + // than this. NATS pub/sub is fire-and-forget, so a missed Done event would + // otherwise leave a phantom staging row on a peer forever; a live op + // refreshes its mirror at least every stagingBroadcastInterval. + stagingRemoteTTL = 60 * time.Second +) + +// stagingEntry wraps a StagingStatus with the bookkeeping needed to keep peer +// replicas consistent: whether this op is mirrored from a peer (remote) vs. +// owned locally, when it was last updated (for remote-mirror expiry), and when +// its byte progress was last broadcast (for debounce). +type stagingEntry struct { + status StagingStatus + remote bool + updatedAt time.Time + lastPub time.Time +} + // StagingTracker tracks active file staging operations in-memory. // Used by SmartRouter to publish progress and by /api/operations to surface it. +// +// In distributed mode each frontend replica runs its own tracker. The replica +// performing a transfer owns the op locally and broadcasts progress over NATS +// (SetPublisher); peers mirror it via ApplyRemote (SubscribeBroadcasts) so a +// /api/operations poll that round-robins onto any replica surfaces the op. type StagingTracker struct { - mu sync.RWMutex - active map[string]*StagingStatus + mu sync.RWMutex + active map[string]*stagingEntry + publisher messaging.Publisher +} + +// StagingProgressEvent is the wire payload a frontend replica broadcasts on +// SubjectStagingProgress so peer replicas can mirror a staging op they did not +// originate. Done signals the op finished (peers drop their mirrored copy). +type StagingProgressEvent struct { + ModelID string `json:"model_id"` + Status *StagingStatus `json:"status,omitempty"` + Done bool `json:"done"` } // NewStagingTracker creates a new tracker. func NewStagingTracker() *StagingTracker { return &StagingTracker{ - active: make(map[string]*StagingStatus), + active: make(map[string]*stagingEntry), } } +// SetPublisher wires the NATS publisher used to broadcast staging progress to +// peer replicas. No-op publisher (nil) keeps the tracker standalone. +func (t *StagingTracker) SetPublisher(p messaging.Publisher) { + t.mu.Lock() + defer t.mu.Unlock() + t.publisher = p +} + +// SubscribeBroadcasts subscribes to peer replicas' staging-progress broadcasts +// and mirrors them into this tracker, so /api/operations on any replica surfaces +// staging ops it did not originate. Returns the subscription for cleanup. +func (t *StagingTracker) SubscribeBroadcasts(nc messaging.MessagingClient) (messaging.Subscription, error) { + return messaging.SubscribeJSON(nc, messaging.SubjectStagingProgressWildcard, func(evt StagingProgressEvent) { + if evt.ModelID == "" { + return + } + t.ApplyRemote(evt) + }) +} + +// publishStaging emits an event to the per-model staging subject. The publisher +// is captured by the caller under the lock and passed in, so publishing happens +// outside the lock (a slow NATS link must not stall the staging copy loop). +func publishStaging(p messaging.Publisher, evt StagingProgressEvent) { + if p == nil { + return + } + _ = p.Publish(messaging.SubjectStagingProgress(evt.ModelID), evt) +} + // Start registers a new staging operation for the given model. func (t *StagingTracker) Start(modelID, nodeName string, totalFiles int) { t.mu.Lock() - defer t.mu.Unlock() - t.active[modelID] = &StagingStatus{ - ModelID: modelID, - NodeName: nodeName, - TotalFiles: totalFiles, - StartedAt: time.Now(), - Message: "Preparing to stage model files", + e := &stagingEntry{ + status: StagingStatus{ + ModelID: modelID, + NodeName: nodeName, + TotalFiles: totalFiles, + StartedAt: time.Now(), + Message: "Preparing to stage model files", + }, + updatedAt: time.Now(), + // lastPub stays zero so the first UpdateFile tick always broadcasts. } + t.active[modelID] = e + pub := t.publisher + snap := e.status + t.mu.Unlock() + + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Status: &snap}) } // UpdateFile updates the tracker with current file transfer progress. func (t *StagingTracker) UpdateFile(modelID, fileName string, fileIndex int, bytesSent, totalBytes int64, speed string) { t.mu.Lock() - defer t.mu.Unlock() - s, ok := t.active[modelID] + e, ok := t.active[modelID] if !ok { + t.mu.Unlock() return } + s := &e.status s.FileName = fileName s.FileIndex = fileIndex s.BytesSent = bytesSent @@ -79,52 +159,121 @@ func (t *StagingTracker) UpdateFile(modelID, fileName string, fileIndex int, byt } else { s.Message = fmt.Sprintf("Staging %s", fileName) } + + e.updatedAt = time.Now() + // Leading-edge debounce: byte ticks fire many times per second; only + // re-broadcast at most once per stagingBroadcastInterval. + var pub messaging.Publisher + var snap StagingStatus + if time.Since(e.lastPub) >= stagingBroadcastInterval { + e.lastPub = time.Now() + pub = t.publisher + snap = e.status + } + t.mu.Unlock() + + if pub != nil { + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Status: &snap}) + } } // FileComplete marks a single file as done within a staging operation. func (t *StagingTracker) FileComplete(modelID string, fileIndex, totalFiles int) { t.mu.Lock() - defer t.mu.Unlock() - s, ok := t.active[modelID] + e, ok := t.active[modelID] if !ok { + t.mu.Unlock() return } + s := &e.status if totalFiles > 0 { s.Progress = float64(fileIndex) / float64(totalFiles) * 100 } s.BytesSent = 0 s.TotalBytes = 0 s.Speed = "" + e.updatedAt = time.Now() + e.lastPub = time.Now() + pub := t.publisher + snap := e.status + t.mu.Unlock() + + // Always broadcast a per-file completion so peers' progress bars advance. + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Status: &snap}) } // Complete removes a staging operation (it's done). func (t *StagingTracker) Complete(modelID string) { t.mu.Lock() - defer t.mu.Unlock() + _, ok := t.active[modelID] delete(t.active, modelID) + pub := t.publisher + t.mu.Unlock() + + if ok { + // Tell peers to drop their mirrored copy. + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Done: true}) + } +} + +// ApplyRemote merges a peer replica's staging broadcast into this tracker. It +// never re-broadcasts (no echo loop). A locally-owned op is authoritative: a +// remote event for the same model is ignored, so the origin replica receiving +// its own broadcast (and any stray peer event) cannot clobber or delete it. +func (t *StagingTracker) ApplyRemote(evt StagingProgressEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + if existing, ok := t.active[evt.ModelID]; ok && !existing.remote { + // We own this op locally — ignore peer chatter about it. + return + } + if evt.Done { + delete(t.active, evt.ModelID) + return + } + if evt.Status == nil { + return + } + t.active[evt.ModelID] = &stagingEntry{ + status: *evt.Status, + remote: true, + updatedAt: time.Now(), + } } -// GetAll returns a snapshot of all active staging operations. +// GetAll returns a snapshot of all active staging operations. Stale remote +// mirrors (a peer op whose Done event was missed) are pruned here so they don't +// linger in the UI. func (t *StagingTracker) GetAll() map[string]StagingStatus { - t.mu.RLock() - defer t.mu.RUnlock() + t.mu.Lock() + defer t.mu.Unlock() + now := time.Now() result := make(map[string]StagingStatus, len(t.active)) - for k, v := range t.active { - result[k] = *v + for k, e := range t.active { + if e.remote && now.Sub(e.updatedAt) > stagingRemoteTTL { + delete(t.active, k) + continue + } + result[k] = e.status } return result } -// Get returns the status of a specific staging operation, or nil if not active. +// Get returns the status of a specific staging operation, or nil if not active +// (or a stale remote mirror). func (t *StagingTracker) Get(modelID string) *StagingStatus { t.mu.RLock() defer t.mu.RUnlock() - s, ok := t.active[modelID] + e, ok := t.active[modelID] if !ok { return nil } - copy := *s - return © + if e.remote && time.Since(e.updatedAt) > stagingRemoteTTL { + return nil + } + s := e.status + return &s } // StagingProgressCallback is called by file stagers to report byte-level progress. diff --git a/core/services/nodes/staging_progress_broadcast_test.go b/core/services/nodes/staging_progress_broadcast_test.go new file mode 100644 index 000000000000..0f0f0db1ed39 --- /dev/null +++ b/core/services/nodes/staging_progress_broadcast_test.go @@ -0,0 +1,109 @@ +package nodes + +import ( + "encoding/json" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" +) + +// decodeStagingEvents extracts every StagingProgressEvent the fake messaging +// client captured, in publish order. +func decodeStagingEvents(mc *fakeMessagingClient) []StagingProgressEvent { + mc.mu.Lock() + defer mc.mu.Unlock() + var out []StagingProgressEvent + for _, p := range mc.published { + var evt StagingProgressEvent + if err := json.Unmarshal(p.Data, &evt); err != nil { + continue + } + if evt.ModelID == "" { + continue + } + out = append(out, evt) + } + return out +} + +var _ = Describe("StagingTracker cross-replica broadcast", func() { + Context("when a publisher is wired (distributed mode)", func() { + It("broadcasts staging progress so a peer replica surfaces an op it did not originate", func() { + mc := &fakeMessagingClient{} + origin := NewStagingTracker() + origin.SetPublisher(mc) + + origin.Start("model-x", "worker-1", 1) + origin.UpdateFile("model-x", "weights.gguf", 1, 5<<30, 10<<30, "100 MiB/s") + + events := decodeStagingEvents(mc) + Expect(events).ToNot(BeEmpty(), "writes must be broadcast over NATS") + Expect(mc.published[0].Subject).To(Equal(messaging.SubjectStagingProgress("model-x"))) + + // A peer replica that never ran the op merges the broadcast. + peer := NewStagingTracker() + for _, evt := range events { + peer.ApplyRemote(evt) + } + + all := peer.GetAll() + Expect(all).To(HaveKey("model-x")) + Expect(all["model-x"].NodeName).To(Equal("worker-1")) + Expect(all["model-x"].FileName).To(Equal("weights.gguf")) + Expect(all["model-x"].TotalBytes).To(Equal(int64(10 << 30))) + }) + + It("removes the op from the peer when the origin completes it", func() { + mc := &fakeMessagingClient{} + origin := NewStagingTracker() + origin.SetPublisher(mc) + + origin.Start("model-x", "worker-1", 1) + origin.Complete("model-x") + + peer := NewStagingTracker() + for _, evt := range decodeStagingEvents(mc) { + peer.ApplyRemote(evt) + } + Expect(peer.GetAll()).ToNot(HaveKey("model-x")) + }) + + It("does not let a peer broadcast clobber an op this replica is itself running", func() { + local := NewStagingTracker() + local.Start("model-x", "worker-local", 2) + local.UpdateFile("model-x", "weights.gguf", 1, 9<<30, 10<<30, "") + + // A stray/older remote event for the SAME modelID must not overwrite + // the authoritative local state, nor delete it. + local.ApplyRemote(StagingProgressEvent{ + ModelID: "model-x", + Status: &StagingStatus{ModelID: "model-x", NodeName: "worker-other", FileName: "stale.gguf"}, + }) + local.ApplyRemote(StagingProgressEvent{ModelID: "model-x", Done: true}) + + all := local.GetAll() + Expect(all).To(HaveKey("model-x")) + Expect(all["model-x"].NodeName).To(Equal("worker-local")) + Expect(all["model-x"].FileName).To(Equal("weights.gguf")) + }) + }) + + Context("when no publisher is wired (standalone mode)", func() { + It("does not broadcast", func() { + mc := &fakeMessagingClient{} + t := NewStagingTracker() + t.Start("model-x", "worker-1", 1) + t.UpdateFile("model-x", "weights.gguf", 1, 1<<30, 10<<30, "") + Expect(mc.published).To(BeEmpty()) + }) + }) +}) + +var _ = Describe("SubjectStagingProgress", func() { + It("namespaces by model id and matches the wildcard prefix", func() { + Expect(messaging.SubjectStagingProgress("model-x")).To(Equal("staging.model-x.progress")) + Expect(messaging.SubjectStagingProgressWildcard).To(Equal("staging.*.progress")) + }) +})