Skip to content

Commit fa6f830

Browse files
committed
expose agents via mcp in controller http server and add test
Signed-off-by: Jet Chiang <pokyuen.jetchiang-ext@solo.io>
1 parent 02f2956 commit fa6f830

4 files changed

Lines changed: 478 additions & 0 deletions

File tree

go/internal/httpserver/server.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/kagent-dev/kagent/go/internal/a2a"
1010
"github.com/kagent-dev/kagent/go/internal/database"
1111
"github.com/kagent-dev/kagent/go/internal/httpserver/handlers"
12+
"github.com/kagent-dev/kagent/go/internal/mcp"
1213
common "github.com/kagent-dev/kagent/go/internal/utils"
1314
"github.com/kagent-dev/kagent/go/internal/version"
1415
"github.com/kagent-dev/kagent/go/pkg/auth"
@@ -35,6 +36,7 @@ const (
3536
APIPathMemories = "/api/memories"
3637
APIPathNamespaces = "/api/namespaces"
3738
APIPathA2A = "/api/a2a"
39+
APIPathMCP = "/api/mcp"
3840
APIPathFeedback = "/api/feedback"
3941
APIPathLangGraph = "/api/langgraph"
4042
APIPathCrewAI = "/api/crewai"
@@ -51,6 +53,7 @@ type ServerConfig struct {
5153
BindAddr string
5254
KubeClient ctrl_client.Client
5355
A2AHandler a2a.A2AHandlerMux
56+
MCPHandler *mcp.MCPHandler
5457
WatchedNamespaces []string
5558
DbClient database.Client
5659
Authenticator auth.AuthProvider
@@ -225,6 +228,11 @@ func (s *HTTPServer) setupRoutes() {
225228
// A2A
226229
s.router.PathPrefix(APIPathA2A + "/{namespace}/{name}").Handler(s.config.A2AHandler)
227230

231+
// MCP
232+
if s.config.MCPHandler != nil {
233+
s.router.PathPrefix(APIPathMCP).Handler(s.config.MCPHandler)
234+
}
235+
228236
// Use middleware for common functionality
229237
s.router.Use(auth.AuthnMiddleware(s.authenticator))
230238
s.router.Use(contentTypeMiddleware)

go/internal/mcp/mcp_handler.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package mcp
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"strings"
8+
"sync"
9+
"time"
10+
11+
"github.com/google/uuid"
12+
"github.com/kagent-dev/kagent/go/api/v1alpha2"
13+
"github.com/kagent-dev/kagent/go/internal/a2a"
14+
"github.com/kagent-dev/kagent/go/internal/version"
15+
"github.com/kagent-dev/kagent/go/pkg/auth"
16+
"github.com/mark3labs/mcp-go/mcp"
17+
mcpserver "github.com/mark3labs/mcp-go/server"
18+
"sigs.k8s.io/controller-runtime/pkg/client"
19+
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
20+
a2aclient "trpc.group/trpc-go/trpc-a2a-go/client"
21+
"trpc.group/trpc-go/trpc-a2a-go/protocol"
22+
)
23+
24+
// MCPHandler handles MCP requests and bridges them to A2A endpoints
25+
type MCPHandler struct {
26+
kubeClient client.Client
27+
a2aBaseURL string
28+
authenticator auth.AuthProvider
29+
httpServer *mcpserver.StreamableHTTPServer
30+
lock sync.RWMutex
31+
// Map to store context IDs per session and agent
32+
contextBySessionAndAgent sync.Map
33+
}
34+
35+
// NewMCPHandler creates a new MCP handler
36+
// Wraps the StreamableHTTPServer handler adds A2A bridging and context management.
37+
func NewMCPHandler(kubeClient client.Client, a2aBaseURL string, authenticator auth.AuthProvider) (*MCPHandler, error) {
38+
handler := &MCPHandler{
39+
kubeClient: kubeClient,
40+
a2aBaseURL: a2aBaseURL,
41+
authenticator: authenticator,
42+
}
43+
44+
// Create MCP server with tools and session cleanup hooks
45+
hooks := &mcpserver.Hooks{}
46+
hooks.AddOnUnregisterSession(func(ctx context.Context, session mcpserver.ClientSession) {
47+
sessionID := session.SessionID()
48+
handler.contextBySessionAndAgent.Range(func(key, _ any) bool {
49+
keyStr, ok := key.(string)
50+
if !ok {
51+
return true
52+
}
53+
if strings.HasPrefix(keyStr, sessionID+"|") {
54+
handler.contextBySessionAndAgent.Delete(key)
55+
}
56+
return true
57+
})
58+
})
59+
60+
s := mcpserver.NewMCPServer(
61+
"kagent-agents",
62+
version.Version,
63+
mcpserver.WithToolCapabilities(false),
64+
mcpserver.WithHooks(hooks),
65+
)
66+
67+
// Add list_agents tool
68+
s.AddTool(mcp.NewTool("list_agents",
69+
mcp.WithDescription("List invokable kagent agents (accepted + deploymentReady)"),
70+
), handler.handleListAgents)
71+
72+
// Add invoke_agent tool
73+
s.AddTool(mcp.NewTool("invoke_agent",
74+
mcp.WithDescription("Invoke a kagent agent via A2A"),
75+
mcp.WithString("agent", mcp.Description("Agent name (or namespace/name)"), mcp.Required()),
76+
mcp.WithString("task", mcp.Description("Task to run"), mcp.Required()),
77+
), handler.handleInvokeAgent)
78+
79+
// Create HTTP server
80+
handler.httpServer = mcpserver.NewStreamableHTTPServer(s)
81+
82+
return handler, nil
83+
}
84+
85+
// handleListAgents handles the list_agents MCP tool
86+
func (h *MCPHandler) handleListAgents(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
87+
log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "list_agents")
88+
89+
agentList := &v1alpha2.AgentList{}
90+
if err := h.kubeClient.List(ctx, agentList); err != nil {
91+
return mcp.NewToolResultErrorFromErr("list agents", err), nil
92+
}
93+
94+
type agentSummary struct {
95+
Ref string `json:"ref"`
96+
Description string `json:"description,omitempty"`
97+
}
98+
99+
agents := make([]agentSummary, 0)
100+
for _, agent := range agentList.Items {
101+
// Check if agent is accepted and deployment ready
102+
deploymentReady := false
103+
accepted := false
104+
for _, condition := range agent.Status.Conditions {
105+
if condition.Type == "Ready" && condition.Reason == "DeploymentReady" && condition.Status == "True" {
106+
deploymentReady = true
107+
}
108+
if condition.Type == "Accepted" && condition.Status == "True" {
109+
accepted = true
110+
}
111+
}
112+
113+
if !accepted || !deploymentReady {
114+
continue
115+
}
116+
117+
ref := agent.Namespace + "/" + agent.Name
118+
description := agent.Spec.Description
119+
agents = append(agents, agentSummary{
120+
Ref: ref,
121+
Description: description,
122+
})
123+
}
124+
125+
log.Info("Listed agents", "count", len(agents))
126+
if len(agents) == 0 {
127+
return mcp.NewToolResultStructured(map[string]any{"agents": agents}, "No invokable agents found."), nil
128+
}
129+
130+
var fallbackText strings.Builder
131+
for i, agent := range agents {
132+
if i > 0 {
133+
fallbackText.WriteByte('\n')
134+
}
135+
fallbackText.WriteString(agent.Ref)
136+
if agent.Description != "" {
137+
fallbackText.WriteString(" - ")
138+
fallbackText.WriteString(agent.Description)
139+
}
140+
}
141+
142+
return mcp.NewToolResultStructured(map[string]any{"agents": agents}, fallbackText.String()), nil
143+
}
144+
145+
// handleInvokeAgent handles the invoke_agent MCP tool
146+
func (h *MCPHandler) handleInvokeAgent(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
147+
log := ctrllog.FromContext(ctx).WithName("mcp-handler").WithValues("tool", "invoke_agent")
148+
149+
agentRef, err := request.RequireString("agent")
150+
if err != nil {
151+
return mcp.NewToolResultError(err.Error()), nil
152+
}
153+
154+
task, err := request.RequireString("task")
155+
if err != nil {
156+
return mcp.NewToolResultError(err.Error()), nil
157+
}
158+
159+
// Parse agent reference (namespace/name or just name)
160+
agentNS, agentName, ok := strings.Cut(agentRef, "/")
161+
if !ok {
162+
return mcp.NewToolResultError("agent must be in format 'namespace/name'"), nil
163+
}
164+
agentRef = agentNS + "/" + agentName
165+
166+
// Get session ID from context if available
167+
sessionID := ""
168+
if session := mcpserver.ClientSessionFromContext(ctx); session != nil {
169+
sessionID = session.SessionID()
170+
} else if headerSessionID := request.Header.Get(mcpserver.HeaderKeySessionID); headerSessionID != "" {
171+
sessionID = headerSessionID
172+
}
173+
if sessionID == "" {
174+
sessionID = uuid.New().String()
175+
}
176+
177+
// Get or create context ID for this session and agent
178+
contextKey := sessionID + "|" + agentRef
179+
var contextIDPtr *string
180+
if prior, ok := h.contextBySessionAndAgent.Load(contextKey); ok {
181+
if priorStr, ok := prior.(string); ok && priorStr != "" {
182+
contextIDPtr = &priorStr
183+
}
184+
}
185+
186+
// Create A2A client
187+
a2aURL := fmt.Sprintf("%s/%s/", h.a2aBaseURL, agentRef)
188+
a2aClient, err := a2aclient.NewA2AClient(a2aURL, a2aclient.WithTimeout(30*time.Second))
189+
if err != nil {
190+
log.Error(err, "Failed to create A2A client", "agent", agentRef)
191+
return mcp.NewToolResultErrorFromErr("a2a client", err), nil
192+
}
193+
194+
// Send message via A2A
195+
result, err := a2aClient.SendMessage(ctx, protocol.SendMessageParams{
196+
Message: protocol.Message{
197+
Kind: protocol.KindMessage,
198+
Role: protocol.MessageRoleUser,
199+
ContextID: contextIDPtr,
200+
Parts: []protocol.Part{protocol.NewTextPart(task)},
201+
},
202+
})
203+
if err != nil {
204+
log.Error(err, "Failed to send A2A message", "agent", agentRef)
205+
return mcp.NewToolResultErrorFromErr("a2a send", err), nil
206+
}
207+
208+
// Extract response text and context ID
209+
var responseText, newContextID string
210+
switch a2aResult := result.Result.(type) {
211+
case *protocol.Message:
212+
responseText = a2a.ExtractText(*a2aResult)
213+
if a2aResult.ContextID != nil {
214+
newContextID = *a2aResult.ContextID
215+
}
216+
case *protocol.Task:
217+
newContextID = a2aResult.ContextID
218+
if a2aResult.Status.Message != nil {
219+
responseText = a2a.ExtractText(*a2aResult.Status.Message)
220+
}
221+
for _, artifact := range a2aResult.Artifacts {
222+
responseText += a2a.ExtractText(protocol.Message{Parts: artifact.Parts})
223+
}
224+
}
225+
226+
if responseText == "" {
227+
raw, err := result.MarshalJSON()
228+
if err != nil {
229+
return mcp.NewToolResultErrorFromErr("marshal result", err), nil
230+
}
231+
responseText = string(raw)
232+
}
233+
234+
// Store new context ID if available
235+
if newContextID != "" {
236+
h.contextBySessionAndAgent.Store(contextKey, newContextID)
237+
}
238+
239+
log.Info("Invoked agent", "agent", agentRef, "hasContextID", newContextID != "")
240+
return mcp.NewToolResultStructured(map[string]any{
241+
"agent": agentRef,
242+
"text": responseText,
243+
}, responseText), nil
244+
}
245+
246+
// ServeHTTP implements http.Handler interface
247+
func (h *MCPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
248+
// The MCP HTTP server handles all the routing internally
249+
h.httpServer.ServeHTTP(w, r)
250+
}
251+
252+
// Shutdown gracefully shuts down the MCP handler
253+
func (h *MCPHandler) Shutdown(ctx context.Context) error {
254+
return h.httpServer.Shutdown(ctx)
255+
}

go/pkg/app/app.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838

3939
"github.com/kagent-dev/kagent/go/internal/a2a"
4040
"github.com/kagent-dev/kagent/go/internal/database"
41+
"github.com/kagent-dev/kagent/go/internal/mcp"
4142
versionmetrics "github.com/kagent-dev/kagent/go/internal/metrics"
4243

4344
"github.com/kagent-dev/kagent/go/internal/controller/reconciler"
@@ -450,6 +451,17 @@ func Start(getExtensionConfig GetExtensionConfig) {
450451
os.Exit(1)
451452
}
452453

454+
// Create MCP handler that bridges to A2A
455+
mcpHandler, err := mcp.NewMCPHandler(
456+
mgr.GetClient(),
457+
cfg.A2ABaseUrl+httpserver.APIPathA2A,
458+
extensionCfg.Authenticator,
459+
)
460+
if err != nil {
461+
setupLog.Error(err, "unable to create MCP handler")
462+
os.Exit(1)
463+
}
464+
453465
// +kubebuilder:scaffold:builder
454466
if metricsCertWatcher != nil {
455467
setupLog.Info("Adding metrics certificate watcher to manager")
@@ -486,6 +498,7 @@ func Start(getExtensionConfig GetExtensionConfig) {
486498
BindAddr: cfg.HttpServerAddr,
487499
KubeClient: mgr.GetClient(),
488500
A2AHandler: a2aHandler,
501+
MCPHandler: mcpHandler,
489502
WatchedNamespaces: watchNamespacesList,
490503
DbClient: dbClient,
491504
Authorizer: extensionCfg.Authorizer,

0 commit comments

Comments
 (0)