Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion cmd/bbr/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
"sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server"
bbrutils "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/utils"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand Down Expand Up @@ -102,8 +103,24 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

//Initialize PluginRegistry and request/response PluginsChain instances
registry, requestChain, responseChain, err := bbrutils.InitPlugins()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate factory registration and plugin creation?

if err != nil {
setupLog.Error(err, "Failed to initialize plugins")
return err
}

setupLog.Info("BBR started with:",
"registry", registry,
"requestChain", requestChain,
"responseChain", responseChain)

// Setup runner.
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort, *streaming)
serverRunner := runserver.NewDefaultExtProcServerRunner(*grpcPort,
*streaming,
registry,
requestChain,
responseChain)

// Register health server.
if err := registerHealthServer(mgr, *grpcHealthPort); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bbr/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Body-Based Routing
This package provides an extension that can be deployed to write the `model`
HTTP body parameter as a header (X-Gateway-Model-Name) so as to enable routing capabilities on the
By deafult this package provides a plugable extension that can be to set the `model`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
By deafult this package provides a plugable extension that can be to set the `model`
By deafult this package provides a pluggable extension that can be to set the `model`

HTTP body parameter as a header (`X-Gateway-Model-Name`) so as to enable routing capabilities on the
model name.

As per OpenAI spec, it is standard for the model name to be included in the
Expand Down
58 changes: 58 additions & 0 deletions pkg/bbr/framework/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package framework

import (
bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do not introduce import aliases when not required for clarity or by name conflict

)

const (
RequestPluginChain = "REQUEST_PLUGINS_CHAIN"
ResponsePluginChain = "RESPONSE_PLUGINS_CHAIN"
)

// placeholder for Plugin constructors
type PluginFactoryFunc func() bbrplugins.BBRPlugin //any no-argument function that returns bbrplugins.BBRPlugin can be assigned to this type (including a constructor function)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please take a look at definition of factories in EPP?


// ----------------------- Registry Interface --------------------------------------------------
// PluginRegistry defines operations for managing plugin factories and plugin instances
type PluginRegistry interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See notes on proposal with respect to

  • needing an interface at all;
  • wide interfaces; and
  • mixing factories and instances in the same collection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For brevity, minimize repitition of comments made on the proposal's code sections.

RegisterFactory(typeKey string, factory PluginFactoryFunc) error //constructors
RegisterPlugin(plugin bbrplugins.BBRPlugin) error //registers a plugin instance (the instance is supposed to be created via the factory first)
GetFactory(typeKey string) (PluginFactoryFunc, error)
GetPlugin(typeKey string) (bbrplugins.BBRPlugin, error)
GetFactories() map[string]PluginFactoryFunc
GetPlugins() map[string]bbrplugins.BBRPlugin
ListPlugins() []string
ListFactories() []string
CreatePlugin(typeKey string) (bbrplugins.BBRPlugin, error)
ContainsFactory(typeKey string) bool
ContainsPlugin(typeKey string) bool
String() string
}

// ------------------------ Ordered Plugins Interface ------------------------------------------
// PluginsChain is used to define a specific order of execution of the plugin instances stored in the registry
type PluginsChain interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need interface? is scope right-sized?
is PluginChain a BBRPlugin as well?

AddPlugin(typeKey string, registry PluginRegistry) error //to be added to the chain the plugin should be registered in the registry first
AddPluginAtInd(typeKey string, i int, r PluginRegistry) error //only affects the instance of the plugin chain
GetPlugin(index int, registry PluginRegistry) (bbrplugins.BBRPlugin, error) //retrieves i-th plugin as defined in the chain from the registry
Length() int
GetPlugins() []string
Run(bodyBytes []byte, registry PluginRegistry) (map[string]string, []byte, error) //return potentially mutated body and all headers map safely merged
String() string
}
276 changes: 276 additions & 0 deletions pkg/bbr/framework/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package framework

import (
"fmt"

bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins"
)

// -------------------- INTERFACES -----------------------------------------------------------------------
// Interfaces are defined in "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework/interfaces.go"

// --------------------- PluginRegistry implementation ---------------------------------------------------

// pluginRegistry implements PluginRegistry
type pluginRegistry struct {
pluginsFactory map[string]PluginFactoryFunc //constructors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest using factories and plugins for fields.

plugins map[string]bbrplugins.BBRPlugin // instances
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

factories are likley registered on start. Can plugin creation be done in parallel by multiple go routines? If so, consider concurrency control.

}

// NewPluginRegistry creates a new instance of pluginRegistry
func NewPluginRegistry() PluginRegistry {
return &pluginRegistry{
pluginsFactory: make(map[string]PluginFactoryFunc),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest using factories and plugins for fields.

plugins: make(map[string]bbrplugins.BBRPlugin),
}
}

// Register a plugin factory by type key (e.g., "ModelSelector", "MetadataExtractor")
func (r *pluginRegistry) RegisterFactory(typeKey string, factory PluginFactoryFunc) error {
//validate whether already registered
alreadyRegistered := r.ContainsFactory(typeKey)
if alreadyRegistered {
err := fmt.Errorf("factory fot plugin interface type %s is already registered", typeKey)
return err
}
r.pluginsFactory[typeKey] = factory

return nil
}

// Register a plugin instance (created through the appropriate factory)
func (r *pluginRegistry) RegisterPlugin(plugin bbrplugins.BBRPlugin) error {
//validate whether this interface is supported
alreadyRegistered := r.ContainsPlugin(plugin.TypedName().Type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coming from EPP, my expectation is that plugin instances have the same type as their factory and that multiple plugin instances (of the same type) may co-exist.
As a user of this, it seems natural (IMO) to register the factory using the plugin Type, and the plugin using the type and name


if alreadyRegistered {
err := fmt.Errorf("plugin implementing interface type %s is already registered", plugin.TypedName().Type)
return err
}

// validate that the factory for this plugin is registered: always register factory before the plugin
if _, ok := r.pluginsFactory[plugin.TypedName().Type]; !ok {
err := fmt.Errorf("no plugin factory registered for plugin interface type %s", plugin.TypedName().Type)
return err
}
r.plugins[plugin.TypedName().Type] = plugin

return nil
}

// Retrieves a plugin factory by type key
func (r *pluginRegistry) GetFactory(typeKey string) (PluginFactoryFunc, error) {
if pluginFactory, ok := r.pluginsFactory[typeKey]; ok {
return pluginFactory, nil
}
return nil, fmt.Errorf("plugin type %s not found", typeKey)
}

// Retrieves a plugin instance by type key
func (r *pluginRegistry) GetPlugin(typeKey string) (bbrplugins.BBRPlugin, error) {
if plugin, ok := r.plugins[typeKey]; ok {
return plugin, nil
}
return nil, fmt.Errorf("plugin type %s not found", typeKey)
}

// Constructs a new plugin (a caller can perform either type assertion of a concrete implementation of the BBR plugin)
func (r *pluginRegistry) CreatePlugin(typeKey string) (bbrplugins.BBRPlugin, error) {
if factory, ok := r.pluginsFactory[typeKey]; ok {
plugin := factory()
return plugin, nil
}
return nil, fmt.Errorf("plugin %s not registered", typeKey)
}

// Removes a plugin factory by type key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed?

func (r *pluginRegistry) UnregisterFactory(typeKey string) error {
if _, ok := r.pluginsFactory[typeKey]; ok {
delete(r.pluginsFactory, typeKey)
return nil
}
return fmt.Errorf("plugin (%s) not found", typeKey)
}

// ListPlugins lists all registered plugins
func (r *pluginRegistry) ListPlugins() []string {
typeKeys := make([]string, 0, len(r.plugins))
for k := range r.plugins {
typeKeys = append(typeKeys, k)
}
return typeKeys
}

// ListPlugins lists all registered plugins; this functionis not really needed. Just for sanity checks and tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case you can make it private and put tests in the same package?

func (r *pluginRegistry) ListFactories() []string {
typeKeys := make([]string, 0, len(r.pluginsFactory))
for k := range r.pluginsFactory {
typeKeys = append(typeKeys, k)
}
return typeKeys
}

// Get factories
func (r *pluginRegistry) GetFactories() map[string]PluginFactoryFunc {
return r.pluginsFactory
}

// Get plugins
func (r *pluginRegistry) GetPlugins() map[string]bbrplugins.BBRPlugin {
return r.plugins
}

// Checks for presense of a factory in this registry
func (r *pluginRegistry) ContainsFactory(typeKey string) bool {
_, exists := r.pluginsFactory[typeKey]
return exists
}

// Helper: Checks for presense of a plugin in this registry
func (r *pluginRegistry) ContainsPlugin(typeKey string) bool {
_, exists := r.plugins[typeKey]
return exists
}

func (r *pluginRegistry) String() string {
return fmt.Sprintf("{plugins=%v}{pluginsFactory=%v}", r.plugins, r.pluginsFactory)
}

//-------------------------- PluginsChain implementation --------------------------

// PluginsChain is a sequence of plugins to be executed in order inside the ext_proc server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code currently has a single request chain only, consider removing types and variables that suggest multiplicity in order to make this PR smaller and more focused.

type pluginsChain struct {
plugins []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rational for using named references instead of actual struct references?

}

// NewPluginsChain creates a new PluginsChain instance
func NewPluginsChain() PluginsChain {
return &pluginsChain{
plugins: []string{},
}
}

// AddPlugin adds a plugin to the chain
func (pc *pluginsChain) AddPlugin(typeKey string, r PluginRegistry) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider changing the chain to work with BBRPlugins and not their names.

// check whether this plugin was registered in the registry (i.e., the factory for the plugin exist and an instance was created)
if ok := r.ContainsPlugin(typeKey); !ok {
err := fmt.Errorf("plugin type %s not found", typeKey)
return err
}
pc.plugins = append(pc.plugins, typeKey)

return nil
}

// GetPlugin retrieves the next plugin in the chain by index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed? What's the caller's use case (as opposed to execute all in chain)?
If you really need iteration, consider a Range() method with a function callback parameter

func (pc *pluginsChain) GetPlugin(index int, r PluginRegistry) (bbrplugins.BBRPlugin, error) {
if index < 0 || index >= len(pc.plugins) {
return nil, fmt.Errorf("plugin index %d out of range", index)
}
plugins := r.GetPlugins()
plugin, ok := plugins[pc.plugins[index]]
if !ok {
return nil, fmt.Errorf("plugin index %d is not found in the registry", index)
}
return plugin, nil
}

// Length returns the number of plugins in the chain
func (pc *pluginsChain) Length() int {
return len(pc.plugins)
}

// AddPluginInOrder inserts a plugin into the chain in the specified index
func (pc *pluginsChain) AddPluginAtInd(typeKey string, i int, r PluginRegistry) error {
if i < 0 || i > len(pc.plugins) {
return fmt.Errorf("index %d is out of range", i)
}
// validate that the plugin is registered
plugins := r.GetPlugins()
if _, ok := plugins[pc.plugins[i]]; !ok {
return fmt.Errorf("plugin index %d is not found in the registry", i)
}
pc.plugins = append(pc.plugins[:i], append([]string{typeKey}, pc.plugins[i:]...)...)
return nil
}

func (pc *pluginsChain) GetPlugins() []string {
return pc.plugins
}

// MergeMaps copies all key/value pairs from src into dst and returns dst.
// If dst is nil a new map is allocated.
// Existing keys in dst are not overwritten.
// This is a helper function used to merge headers from multiple plugins safely.
func MergeMaps(dst map[string]string, src map[string]string) map[string]string {
if src == nil {
if dst == nil {
return map[string]string{}
}
return dst
}
if dst == nil {
dst = make(map[string]string, len(src))
}

for k, v := range src {
if _, exists := dst[k]; !exists {
dst[k] = v
}
}

return dst
}

func (pc *pluginsChain) Run(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use unnamed returns and a single prototype line

bodyBytes []byte,
r PluginRegistry,
) (headers map[string]string, mutateBodyBytes []byte, err error) {

allHeaders := make(map[string]string)
mutatedBodyBytes := bodyBytes

for i := range pc.Length() {
plugin, _ := pc.GetPlugin(i, r)
pluginType := plugin.TypedName().Type

metExtPlugin, err := r.GetPlugin(pluginType)

if err != nil {
return allHeaders, bodyBytes, err
}

// The plugin i in the chain receives the (potentially mutated) body and headers from plugin i-1 in the chain
headers, mutatedBodyBytes, err := metExtPlugin.Execute(mutatedBodyBytes)

if err != nil {
return headers, mutatedBodyBytes, err
}

//note that the existing overlapping keys are NOT over-written by merge
MergeMaps(allHeaders, headers)
}
return allHeaders, mutatedBodyBytes, nil
}

func (pc *pluginsChain) String() string {
return fmt.Sprintf("PluginsChain{plugins=%v}", pc.plugins)
}

// -------------------------- End of PluginsChain implementation --------------------------
Loading