-
Notifications
You must be signed in to change notification settings - Fork 219
Pluggable bbr framework initial PR #1981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,6 +1,6 @@ | ||||||
| # Body-Based Routing | ||||||
| This package provides an extension that can be deployed to write the `model` | ||||||
| HTTP body parameter as a header (X-Gateway-Model-Name) so as to enable routing capabilities on the | ||||||
| By deafult this package provides a plugable extension that can be to set the `model` | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| HTTP body parameter as a header (`X-Gateway-Model-Name`) so as to enable routing capabilities on the | ||||||
| model name. | ||||||
|
|
||||||
| As per OpenAI spec, it is standard for the model name to be included in the | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| /* | ||
| Copyright 2025 The Kubernetes Authors. | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package framework | ||
|
|
||
| import ( | ||
| bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: do not introduce import aliases when not required for clarity or by name conflict |
||
| ) | ||
|
|
||
| const ( | ||
| RequestPluginChain = "REQUEST_PLUGINS_CHAIN" | ||
| ResponsePluginChain = "RESPONSE_PLUGINS_CHAIN" | ||
| ) | ||
|
|
||
| // placeholder for Plugin constructors | ||
| type PluginFactoryFunc func() bbrplugins.BBRPlugin //any no-argument function that returns bbrplugins.BBRPlugin can be assigned to this type (including a constructor function) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: please take a look at definition of factories in EPP? |
||
|
|
||
| // ----------------------- Registry Interface -------------------------------------------------- | ||
| // PluginRegistry defines operations for managing plugin factories and plugin instances | ||
| type PluginRegistry interface { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See notes on proposal with respect to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For brevity, minimize repitition of comments made on the proposal's code sections. |
||
| RegisterFactory(typeKey string, factory PluginFactoryFunc) error //constructors | ||
| RegisterPlugin(plugin bbrplugins.BBRPlugin) error //registers a plugin instance (the instance is supposed to be created via the factory first) | ||
| GetFactory(typeKey string) (PluginFactoryFunc, error) | ||
| GetPlugin(typeKey string) (bbrplugins.BBRPlugin, error) | ||
| GetFactories() map[string]PluginFactoryFunc | ||
| GetPlugins() map[string]bbrplugins.BBRPlugin | ||
| ListPlugins() []string | ||
| ListFactories() []string | ||
| CreatePlugin(typeKey string) (bbrplugins.BBRPlugin, error) | ||
| ContainsFactory(typeKey string) bool | ||
| ContainsPlugin(typeKey string) bool | ||
| String() string | ||
| } | ||
|
|
||
| // ------------------------ Ordered Plugins Interface ------------------------------------------ | ||
| // PluginsChain is used to define a specific order of execution of the plugin instances stored in the registry | ||
| type PluginsChain interface { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need interface? is scope right-sized? |
||
| AddPlugin(typeKey string, registry PluginRegistry) error //to be added to the chain the plugin should be registered in the registry first | ||
| AddPluginAtInd(typeKey string, i int, r PluginRegistry) error //only affects the instance of the plugin chain | ||
| GetPlugin(index int, registry PluginRegistry) (bbrplugins.BBRPlugin, error) //retrieves i-th plugin as defined in the chain from the registry | ||
| Length() int | ||
| GetPlugins() []string | ||
| Run(bodyBytes []byte, registry PluginRegistry) (map[string]string, []byte, error) //return potentially mutated body and all headers map safely merged | ||
| String() string | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,276 @@ | ||
| /* | ||
| Copyright 2025 The Kubernetes Authors. | ||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package framework | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| bbrplugins "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/plugins" | ||
| ) | ||
|
|
||
| // -------------------- INTERFACES ----------------------------------------------------------------------- | ||
| // Interfaces are defined in "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/framework/interfaces.go" | ||
|
|
||
| // --------------------- PluginRegistry implementation --------------------------------------------------- | ||
|
|
||
| // pluginRegistry implements PluginRegistry | ||
| type pluginRegistry struct { | ||
| pluginsFactory map[string]PluginFactoryFunc //constructors | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggest using |
||
| plugins map[string]bbrplugins.BBRPlugin // instances | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. factories are likley registered on start. Can plugin creation be done in parallel by multiple go routines? If so, consider concurrency control. |
||
| } | ||
|
|
||
| // NewPluginRegistry creates a new instance of pluginRegistry | ||
| func NewPluginRegistry() PluginRegistry { | ||
| return &pluginRegistry{ | ||
| pluginsFactory: make(map[string]PluginFactoryFunc), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggest using |
||
| plugins: make(map[string]bbrplugins.BBRPlugin), | ||
| } | ||
| } | ||
|
|
||
| // Register a plugin factory by type key (e.g., "ModelSelector", "MetadataExtractor") | ||
| func (r *pluginRegistry) RegisterFactory(typeKey string, factory PluginFactoryFunc) error { | ||
| //validate whether already registered | ||
| alreadyRegistered := r.ContainsFactory(typeKey) | ||
| if alreadyRegistered { | ||
| err := fmt.Errorf("factory fot plugin interface type %s is already registered", typeKey) | ||
| return err | ||
| } | ||
| r.pluginsFactory[typeKey] = factory | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Register a plugin instance (created through the appropriate factory) | ||
| func (r *pluginRegistry) RegisterPlugin(plugin bbrplugins.BBRPlugin) error { | ||
| //validate whether this interface is supported | ||
| alreadyRegistered := r.ContainsPlugin(plugin.TypedName().Type) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. coming from EPP, my expectation is that plugin instances have the same type as their factory and that multiple plugin instances (of the same type) may co-exist. |
||
|
|
||
| if alreadyRegistered { | ||
| err := fmt.Errorf("plugin implementing interface type %s is already registered", plugin.TypedName().Type) | ||
| return err | ||
| } | ||
|
|
||
| // validate that the factory for this plugin is registered: always register factory before the plugin | ||
| if _, ok := r.pluginsFactory[plugin.TypedName().Type]; !ok { | ||
| err := fmt.Errorf("no plugin factory registered for plugin interface type %s", plugin.TypedName().Type) | ||
| return err | ||
| } | ||
| r.plugins[plugin.TypedName().Type] = plugin | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Retrieves a plugin factory by type key | ||
| func (r *pluginRegistry) GetFactory(typeKey string) (PluginFactoryFunc, error) { | ||
| if pluginFactory, ok := r.pluginsFactory[typeKey]; ok { | ||
| return pluginFactory, nil | ||
| } | ||
| return nil, fmt.Errorf("plugin type %s not found", typeKey) | ||
| } | ||
|
|
||
| // Retrieves a plugin instance by type key | ||
| func (r *pluginRegistry) GetPlugin(typeKey string) (bbrplugins.BBRPlugin, error) { | ||
| if plugin, ok := r.plugins[typeKey]; ok { | ||
| return plugin, nil | ||
| } | ||
| return nil, fmt.Errorf("plugin type %s not found", typeKey) | ||
| } | ||
|
|
||
| // Constructs a new plugin (a caller can perform either type assertion of a concrete implementation of the BBR plugin) | ||
| func (r *pluginRegistry) CreatePlugin(typeKey string) (bbrplugins.BBRPlugin, error) { | ||
| if factory, ok := r.pluginsFactory[typeKey]; ok { | ||
| plugin := factory() | ||
| return plugin, nil | ||
| } | ||
| return nil, fmt.Errorf("plugin %s not registered", typeKey) | ||
| } | ||
|
|
||
| // Removes a plugin factory by type key | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needed? |
||
| func (r *pluginRegistry) UnregisterFactory(typeKey string) error { | ||
| if _, ok := r.pluginsFactory[typeKey]; ok { | ||
| delete(r.pluginsFactory, typeKey) | ||
| return nil | ||
| } | ||
| return fmt.Errorf("plugin (%s) not found", typeKey) | ||
| } | ||
|
|
||
| // ListPlugins lists all registered plugins | ||
| func (r *pluginRegistry) ListPlugins() []string { | ||
| typeKeys := make([]string, 0, len(r.plugins)) | ||
| for k := range r.plugins { | ||
| typeKeys = append(typeKeys, k) | ||
| } | ||
| return typeKeys | ||
| } | ||
|
|
||
| // ListPlugins lists all registered plugins; this functionis not really needed. Just for sanity checks and tests | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in that case you can make it private and put tests in the same package? |
||
| func (r *pluginRegistry) ListFactories() []string { | ||
| typeKeys := make([]string, 0, len(r.pluginsFactory)) | ||
| for k := range r.pluginsFactory { | ||
| typeKeys = append(typeKeys, k) | ||
| } | ||
| return typeKeys | ||
| } | ||
|
|
||
| // Get factories | ||
| func (r *pluginRegistry) GetFactories() map[string]PluginFactoryFunc { | ||
| return r.pluginsFactory | ||
| } | ||
|
|
||
| // Get plugins | ||
| func (r *pluginRegistry) GetPlugins() map[string]bbrplugins.BBRPlugin { | ||
| return r.plugins | ||
| } | ||
|
|
||
| // Checks for presense of a factory in this registry | ||
| func (r *pluginRegistry) ContainsFactory(typeKey string) bool { | ||
| _, exists := r.pluginsFactory[typeKey] | ||
| return exists | ||
| } | ||
|
|
||
| // Helper: Checks for presense of a plugin in this registry | ||
| func (r *pluginRegistry) ContainsPlugin(typeKey string) bool { | ||
| _, exists := r.plugins[typeKey] | ||
| return exists | ||
| } | ||
|
|
||
| func (r *pluginRegistry) String() string { | ||
| return fmt.Sprintf("{plugins=%v}{pluginsFactory=%v}", r.plugins, r.pluginsFactory) | ||
| } | ||
|
|
||
| //-------------------------- PluginsChain implementation -------------------------- | ||
|
|
||
| // PluginsChain is a sequence of plugins to be executed in order inside the ext_proc server | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code currently has a single request chain only, consider removing types and variables that suggest multiplicity in order to make this PR smaller and more focused. |
||
| type pluginsChain struct { | ||
| plugins []string | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rational for using named references instead of actual struct references? |
||
| } | ||
|
|
||
| // NewPluginsChain creates a new PluginsChain instance | ||
| func NewPluginsChain() PluginsChain { | ||
| return &pluginsChain{ | ||
| plugins: []string{}, | ||
| } | ||
| } | ||
|
|
||
| // AddPlugin adds a plugin to the chain | ||
| func (pc *pluginsChain) AddPlugin(typeKey string, r PluginRegistry) error { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider changing the chain to work with BBRPlugins and not their names. |
||
| // check whether this plugin was registered in the registry (i.e., the factory for the plugin exist and an instance was created) | ||
| if ok := r.ContainsPlugin(typeKey); !ok { | ||
| err := fmt.Errorf("plugin type %s not found", typeKey) | ||
| return err | ||
| } | ||
| pc.plugins = append(pc.plugins, typeKey) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // GetPlugin retrieves the next plugin in the chain by index | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed? What's the caller's use case (as opposed to execute all in chain)? |
||
| func (pc *pluginsChain) GetPlugin(index int, r PluginRegistry) (bbrplugins.BBRPlugin, error) { | ||
| if index < 0 || index >= len(pc.plugins) { | ||
| return nil, fmt.Errorf("plugin index %d out of range", index) | ||
| } | ||
| plugins := r.GetPlugins() | ||
| plugin, ok := plugins[pc.plugins[index]] | ||
| if !ok { | ||
| return nil, fmt.Errorf("plugin index %d is not found in the registry", index) | ||
| } | ||
| return plugin, nil | ||
| } | ||
|
|
||
| // Length returns the number of plugins in the chain | ||
| func (pc *pluginsChain) Length() int { | ||
| return len(pc.plugins) | ||
| } | ||
|
|
||
| // AddPluginInOrder inserts a plugin into the chain in the specified index | ||
| func (pc *pluginsChain) AddPluginAtInd(typeKey string, i int, r PluginRegistry) error { | ||
| if i < 0 || i > len(pc.plugins) { | ||
| return fmt.Errorf("index %d is out of range", i) | ||
| } | ||
| // validate that the plugin is registered | ||
| plugins := r.GetPlugins() | ||
| if _, ok := plugins[pc.plugins[i]]; !ok { | ||
| return fmt.Errorf("plugin index %d is not found in the registry", i) | ||
| } | ||
| pc.plugins = append(pc.plugins[:i], append([]string{typeKey}, pc.plugins[i:]...)...) | ||
| return nil | ||
| } | ||
|
|
||
| func (pc *pluginsChain) GetPlugins() []string { | ||
| return pc.plugins | ||
| } | ||
|
|
||
| // MergeMaps copies all key/value pairs from src into dst and returns dst. | ||
| // If dst is nil a new map is allocated. | ||
| // Existing keys in dst are not overwritten. | ||
| // This is a helper function used to merge headers from multiple plugins safely. | ||
| func MergeMaps(dst map[string]string, src map[string]string) map[string]string { | ||
| if src == nil { | ||
| if dst == nil { | ||
| return map[string]string{} | ||
| } | ||
| return dst | ||
| } | ||
| if dst == nil { | ||
| dst = make(map[string]string, len(src)) | ||
| } | ||
|
|
||
| for k, v := range src { | ||
| if _, exists := dst[k]; !exists { | ||
| dst[k] = v | ||
| } | ||
| } | ||
|
|
||
| return dst | ||
| } | ||
|
|
||
| func (pc *pluginsChain) Run( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use unnamed returns and a single prototype line |
||
| bodyBytes []byte, | ||
| r PluginRegistry, | ||
| ) (headers map[string]string, mutateBodyBytes []byte, err error) { | ||
|
|
||
| allHeaders := make(map[string]string) | ||
| mutatedBodyBytes := bodyBytes | ||
|
|
||
| for i := range pc.Length() { | ||
| plugin, _ := pc.GetPlugin(i, r) | ||
| pluginType := plugin.TypedName().Type | ||
|
|
||
| metExtPlugin, err := r.GetPlugin(pluginType) | ||
|
|
||
| if err != nil { | ||
| return allHeaders, bodyBytes, err | ||
| } | ||
|
|
||
| // The plugin i in the chain receives the (potentially mutated) body and headers from plugin i-1 in the chain | ||
| headers, mutatedBodyBytes, err := metExtPlugin.Execute(mutatedBodyBytes) | ||
|
|
||
| if err != nil { | ||
| return headers, mutatedBodyBytes, err | ||
| } | ||
|
|
||
| //note that the existing overlapping keys are NOT over-written by merge | ||
| MergeMaps(allHeaders, headers) | ||
| } | ||
| return allHeaders, mutatedBodyBytes, nil | ||
| } | ||
|
|
||
| func (pc *pluginsChain) String() string { | ||
| return fmt.Sprintf("PluginsChain{plugins=%v}", pc.plugins) | ||
| } | ||
|
|
||
| // -------------------------- End of PluginsChain implementation -------------------------- | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate factory registration and plugin creation?