Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,17 @@ kubectl logs sampler -f
{"results": [{"data": {"c": {"samples": ["0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x0", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x1", "0x0", "0x1", "0x0", "0x0", "0x0", "0x1", "0x0", "0x0", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0", "0x0", "0x1", "0x1", "0x1", "0x0", "0x1", "0x1", "0x1", "0x1", "0x1", "0x1", "0x0", "0x0", "0x0", "0x0"], "num_bits": 1}}, "metadata": {"circuit_metadata": {}}}], "metadata": {"execution": {"execution_spans": [[{"date": "2026-06-06T19:04:43.221657"}, {"date": "2026-06-06T19:04:44.372421"}, {"0": [[256], [0, 1], [0, 256]]}]]}, "version": 2}}
2026/06/06 19:04:50 done: 2070 bytes from ibm_marrakesh
```
Boum!
Boum! You will see in the fluence logs that when the pod completes, the fluxion job is cancelled, freeing the resources.

```bash
kubectl logs -n kube-system fluence-75d6848778-g4lh6
...
I0610 18:33:05.843325 1 eventhandlers.go:443] "Delete event for scheduled pod" pod="default/sampler"
🌀 Cancel jobid: 1
(env) (base) vanessa@vanessa-ThinkPad-P14s-Gen-4:~/Desktop/Code/fluence$ kubectl get pods
NAME READY STATUS RESTARTS AGE
sampler 0/1 Completed 0 24s
```

### A note on deletion

Expand Down
11 changes: 10 additions & 1 deletion deploy/kind-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nodes:
kind: ClusterConfiguration
apiServer:
extraArgs:
# Turn on the alpha API group + the API itself.
- name: runtime-config
value: "scheduling.k8s.io/v1alpha2=true"
- name: feature-gates
Expand All @@ -22,5 +23,13 @@ nodes:
extraArgs:
- name: feature-gates
value: "GenericWorkload=true,GangScheduling=true"
controllerManager:
extraArgs:
# The podgroup-protection-controller (which removes the
# scheduling.k8s.io/podgroup-protection finalizer once a PodGroup's
# pods are gone) is gated on GenericWorkload. Without this, PodGroup
# deletion hangs on the finalizer forever.
- name: feature-gates
value: "GenericWorkload=true"
- role: worker
- role: worker
- role: worker
222 changes: 172 additions & 50 deletions pkg/fluence/fluence.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,44 @@ package fluence
import (
"context"
"fmt"
"log"
"os"
"strconv"
"sync"

"github.com/converged-computing/fluence/pkg/cluster"
"github.com/converged-computing/fluence/pkg/graph"
"github.com/converged-computing/fluence/pkg/placement"

corev1 "k8s.io/api/core/v1"
schedv1a2 "k8s.io/api/scheduling/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
fwk "k8s.io/kube-scheduler/framework"
)

// The scheduler-framework types live in the staging module
// k8s.io/kube-scheduler/framework (imported as fwk). The plugin is built into
// the scheduler binary via cmd/fluence. Signatures here are verified against
// k8s.io/kube-scheduler v0.36.0; keep them in lockstep with the k8s version you
// run on (CycleState and NodeInfo are interfaces, and PreFilter takes the
// candidate node list).
// k8s.io/kube-scheduler v0.36.0.

// Name is the plugin name registered with the scheduler and referenced in the
// KubeSchedulerConfiguration.
const Name = "Fluence"

// groupAlloc is the in-memory record of a group's Fluxion allocation. It is a
// rebuildable, within-lifetime memo: its job is race-free "match once per group"
// dedup on the scheduling path (the durable record is the jobid annotation on
// the owning object). It does not survive a scheduler restart, which is fine —
// the graph itself is rebuilt fresh on restart.
type groupAlloc struct {
place placement.Placement
jobid uint64
}

// Fluence is a scheduler-framework plugin that places whole pod groups by
// matching them against a flux-sched resource graph built from the live cluster
// (plus any configured quantum resources). Gang/all-or-nothing semantics are
Expand All @@ -37,10 +49,14 @@ type Fluence struct {
handle fwk.Handle
matcher *graph.FluxionGraph

// matcherMu serializes all access to the cgo Fluxion client, which is not
// thread-safe. Match runs on the (sequential) scheduling path; Cancel runs in
// informer handler goroutines, so the two can race without this.
matcherMu sync.Mutex

mu sync.Mutex
// placement maps a pod-group key to the placement chosen for the group
// (nodes + allocated backend).
placement map[string]placement.Placement
// placement maps a group key to its allocation (nodes, backend, jobid).
placement map[string]groupAlloc
}

var (
Expand All @@ -50,28 +66,25 @@ var (
)

// New builds the plugin: discover cluster nodes, optionally inject quantum
// resources, write the JGF graph, and initialize the Fluxion matcher.
//
// Configuration (for now via env; can move to plugin args):
// resources, write the JGF graph, initialize the Fluxion matcher, and register
// the delete handlers that cancel allocations when their owning object is gone.
//
// FLUENCE_RESOURCES path to a YAML/JSON resources config (e.g. quantum
// backends). Unset = classical-only graph.
// FLUENCE_MATCH_POLICY Fluxion match policy (default "first")
func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error) {
// List nodes via the API. The scheduler's shared snapshot is empty at
// plugin-construction time (it is populated per scheduling cycle once
// informers have synced), so a direct List is what actually gives us the
// cluster's compute. We assume a static cluster for now: this is read once
// at startup and the graph is not updated as nodes come and go.
// plugin-construction time, so a direct List is what gives us the cluster's
// compute. Static cluster for now: read once, graph not updated live.
nodeList, err := h.ClientSet().CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("list nodes: %w", err)
}

// Classical compute always comes from the cluster nodes. Quantum/other
// resources are added only when a resources config is present. FLUENCE_RESOURCES
// is set on the base scheduler but the file only exists once the resources
// add-on is applied, so a missing file is normal (classical-only), not fatal.
// Quantum/other resources are added only when a resources config is present.
// FLUENCE_RESOURCES is set on the base scheduler but the file only exists once
// the resources add-on is applied, so a missing file is classical-only, not
// fatal.
opts := cluster.Options{}
if path := os.Getenv("FLUENCE_RESOURCES"); path != "" {
raw, err := os.ReadFile(path)
Expand Down Expand Up @@ -107,19 +120,21 @@ func New(ctx context.Context, _ runtime.Object, h fwk.Handle) (fwk.Plugin, error
matcher := &graph.FluxionGraph{MatchFormat: "jgf"}
matcher.Init(tmp.Name(), os.Getenv("FLUENCE_MATCH_POLICY"), "")

return &Fluence{
f := &Fluence{
handle: h,
matcher: matcher,
placement: map[string]placement.Placement{},
}, nil
placement: map[string]groupAlloc{},
}
f.registerCancelHandlers()
return f, nil
}

// Name returns the plugin name.
func (f *Fluence) Name() string { return Name }

// PreFilter runs once per scheduling cycle for a pod. The first pod of a group
// triggers a single match for the whole gang; the resulting node assignment is
// cached and consumed by Filter for every pod in the group.
// triggers a single match for the whole gang; the resulting placement (and the
// Fluxion jobid) is cached and consumed by Filter/PreBind for every pod.
func (f *Fluence) PreFilter(
ctx context.Context,
state fwk.CycleState,
Expand Down Expand Up @@ -149,7 +164,9 @@ func (f *Fluence) PreFilter(
return nil, fwk.AsStatus(err)
}

f.matcherMu.Lock()
req, err := f.matcher.MatchAllocateSpec(specYAML)
f.matcherMu.Unlock()
if err != nil {
return nil, fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("fluxion match failed: %v", err))
}
Expand All @@ -160,18 +177,16 @@ func (f *Fluence) PreFilter(
if len(place.Nodes) == 0 && place.Backend == "" {
return nil, fwk.NewStatus(fwk.Unschedulable, "fluxion returned no allocation")
}
// Note: a quantum-only allocation has a Backend but no Nodes (a qpu vertex
// lives under the qgateway, not under a compute node). That is valid — the
// backend is a remote API reachable from any node — so we do not require a
// node here; Filter imposes no node constraint in that case.
// A quantum-only allocation has a Backend but no Nodes (a qpu vertex lives
// under the qgateway, not under a compute node). That is valid — the backend
// is reachable from any node — so Filter imposes no node constraint then.

f.mu.Lock()
f.placement[group] = place
f.placement[group] = groupAlloc{place: place, jobid: req.Number}
f.mu.Unlock()

// The allocated backend is recorded onto each pod in PreBind (container env
// is immutable post-creation, but annotations can be patched); the
// webhook-injected downward-API env then surfaces it as QRMI_BACKEND.
// The jobid (for cancel) and any backend (for the webhook env) are written
// onto the owning object in PreBind, the commit phase.
return nil, fwk.NewStatus(fwk.Success)
}

Expand All @@ -188,7 +203,7 @@ func (f *Fluence) Filter(
group := groupKey(pod)

f.mu.Lock()
nodes := f.placement[group].Nodes
nodes := f.placement[group].place.Nodes
f.mu.Unlock()

// A quantum-only allocation pins no node (the backend is a remote API any
Expand All @@ -206,52 +221,159 @@ func (f *Fluence) Filter(
return fwk.NewStatus(fwk.Unschedulable, "node not in fluxion allocation for this group")
}

// PreBindPreFlight runs before PreBind. It returns Success when this plugin has
// a backend to stamp on the pod (a quantum group), and Skip otherwise so the
// framework doesn't call PreBind needlessly. It is lightweight: it only reads
// the cached group placement, no API calls.
// PreBindPreFlight runs before PreBind. It returns Success when we have a cached
// allocation for the pod's group (so PreBind can record the jobid, and stamp the
// backend for a quantum pod), and Skip otherwise.
func (f *Fluence) PreBindPreFlight(
ctx context.Context,
state fwk.CycleState,
pod *corev1.Pod,
nodeName string,
) (*fwk.PreBindPreFlightResult, *fwk.Status) {
f.mu.Lock()
backend := f.placement[groupKey(pod)].Backend
_, ok := f.placement[groupKey(pod)]
f.mu.Unlock()
if backend == "" {
if !ok {
return nil, fwk.NewStatus(fwk.Skip)
}
return nil, fwk.NewStatus(fwk.Success)
}

// PreBind writes the backend Fluxion allocated for this pod's group onto the pod
// as the annotation placement.BackendAnnotation. The mutating webhook has
// already wired a downward-API env (QRMI_BACKEND) that reads this annotation, so
// the container sees the backend as an ordinary env var. Container env cannot be
// patched after creation, which is why the value travels via an annotation.
// PreBind records, in the commit phase, the durable state for this group:
// - the Fluxion jobid onto the owning object (the PodGroup for a gang, else the
// pod) so the allocation can be cancelled when that object is deleted;
// - for a quantum group, the allocated backend onto the pod, which the webhook-
// injected downward-API env surfaces as QRMI_BACKEND (container env is
// immutable post-creation, so the value must travel via an annotation).
func (f *Fluence) PreBind(
ctx context.Context,
state fwk.CycleState,
pod *corev1.Pod,
nodeName string,
) *fwk.Status {
f.mu.Lock()
backend := f.placement[groupKey(pod)].Backend
alloc, ok := f.placement[groupKey(pod)]
f.mu.Unlock()
if backend == "" {
return fwk.NewStatus(fwk.Success) // nothing to do; PreBindPreFlight skips these
if !ok {
return fwk.NewStatus(fwk.Success) // not ours; nothing to record
}

patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.BackendAnnotation, backend)
_, err := f.handle.ClientSet().CoreV1().Pods(pod.Namespace).Patch(
ctx, pod.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err))
if err := f.recordJobID(ctx, pod, alloc.jobid); err != nil {
return fwk.AsStatus(fmt.Errorf("record jobid: %w", err))
}
if alloc.place.Backend != "" {
if err := f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.BackendAnnotation, alloc.place.Backend); err != nil {
return fwk.AsStatus(fmt.Errorf("stamp backend annotation: %w", err))
}
}
return fwk.NewStatus(fwk.Success)
}

// recordJobID writes the jobid annotation onto the allocation's owning object: a
// grouped pod's allocation belongs to the PodGroup; an ungrouped pod owns its own.
func (f *Fluence) recordJobID(ctx context.Context, pod *corev1.Pod, jobid uint64) error {
val := strconv.FormatUint(jobid, 10)
if group := placement.PodGroupName(pod); group != "" {
patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, placement.JobIDAnnotation, val)
_, err := f.handle.ClientSet().SchedulingV1alpha2().PodGroups(pod.Namespace).Patch(
ctx, group, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}
return f.patchPodAnnotation(ctx, pod.Namespace, pod.Name, placement.JobIDAnnotation, val)
}

func (f *Fluence) patchPodAnnotation(ctx context.Context, ns, name, key, val string) error {
patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q}}}`, key, val)
_, err := f.handle.ClientSet().CoreV1().Pods(ns).Patch(
ctx, name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}

// registerCancelHandlers watches PodGroup and Pod deletions and frees the
// corresponding Fluxion allocation. Grouped pods are ignored by the pod handler
// (their allocation lives on the PodGroup); ungrouped pods are handled there.
// The framework has no deletion extension point, so this is informer-driven.
func (f *Fluence) registerCancelHandlers() {
sif := f.handle.SharedInformerFactory()

_, _ = sif.Scheduling().V1alpha2().PodGroups().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pg, ok := obj.(*schedv1a2.PodGroup)
if !ok {
tomb, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
if pg, ok = tomb.Obj.(*schedv1a2.PodGroup); !ok {
return
}
}
f.cancelGroup(pg.Namespace+"/"+pg.Name, pg.Annotations)
},
})

_, _ = sif.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tomb, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
if pod, ok = tomb.Obj.(*corev1.Pod); !ok {
return
}
}
// Grouped pods' allocation is owned by the PodGroup; only the
// PodGroup's deletion frees it. Act on ungrouped pods only.
if placement.PodGroupName(pod) != "" {
return
}
f.cancelGroup(pod.Namespace+"/"+pod.Name, pod.Annotations)
},
})
}

// cancelGroup frees the allocation for a deleted owning object. The jobid comes
// from the object's annotation (the durable source of truth); if it is missing
// (e.g. deleted between PreFilter and PreBind, before the annotation was
// written) it falls back to the in-memory memo by key. Cancel is idempotent.
func (f *Fluence) cancelGroup(key string, ann map[string]string) {
jobid, ok := parseJobID(ann)
if !ok {
f.mu.Lock()
alloc, found := f.placement[key]
f.mu.Unlock()
if !found {
return // never scheduled by us, or already cancelled
}
jobid = alloc.jobid
}

f.matcherMu.Lock()
err := f.matcher.Cancel(jobid)
f.matcherMu.Unlock()
if err != nil {
log.Printf("fluence: cancel jobid %d for %s failed: %v", jobid, key, err)
}

f.mu.Lock()
delete(f.placement, key)
f.mu.Unlock()
}

func parseJobID(ann map[string]string) (uint64, bool) {
raw := ann[placement.JobIDAnnotation]
if raw == "" {
return 0, false
}
jobid, err := strconv.ParseUint(raw, 10, 64)
if err != nil {
return 0, false
}
return jobid, true
}

// groupPods returns the pods belonging to the same native PodGroup as pod
// (spec.schedulingGroup.podGroupName). That field is not label-selectable, so we
// list the namespace and filter in code. A pod with no scheduling group is its
Expand Down
Loading
Loading