-
-
Notifications
You must be signed in to change notification settings - Fork 24.5k
feat: add Google A2A (Agent2Agent) protocol support #6484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| import { NextFunction, Request, Response } from 'express' | ||
| import a2aService from '../../services/a2a' | ||
| import { RateLimiterManager } from '../../utils/rateLimit' | ||
| import logger from '../../utils/logger' | ||
|
|
||
| /** | ||
| * Extract token from the Authorization: Bearer <token> header. | ||
| * Returns null if not present or malformed. | ||
| */ | ||
| function extractToken(req: Request): string | null { | ||
| const authHeader = req.headers.authorization | ||
| if (!authHeader || !authHeader.startsWith('Bearer ')) return null | ||
| const token = authHeader.slice(7).trim() | ||
| return token.length > 0 ? token : null | ||
| } | ||
|
|
||
| /** | ||
| * Authentication middleware — validates Bearer token and attaches it to res.locals. | ||
| * If A2A_AUTH_DISABLED env var is set to 'true', authentication is bypassed. | ||
| */ | ||
| const authenticateToken = (req: Request, res: Response, next: NextFunction) => { | ||
| if (process.env.A2A_AUTH_DISABLED === 'true') { | ||
| next() | ||
| return | ||
| } | ||
|
|
||
| const token = extractToken(req) | ||
| if (!token) { | ||
| res.status(401).json({ | ||
| jsonrpc: '2.0', | ||
| error: { code: -32001, message: 'Unauthorized: missing or invalid Authorization header. Use Bearer <token>.' }, | ||
| id: null | ||
| }) | ||
| return | ||
| } | ||
| res.locals.token = token | ||
| next() | ||
| } | ||
|
|
||
| /** | ||
| * Rate limiter middleware for A2A endpoint. | ||
| */ | ||
| const getRateLimiterMiddleware = async (req: Request, res: Response, next: NextFunction) => { | ||
| try { | ||
| return RateLimiterManager.getInstance().getRateLimiter()(req, res, next) | ||
| } catch (error) { | ||
| next(error) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handle GET /.well-known/agent-card.json for a specific chatflow. | ||
| * This is the A2A agent discovery endpoint. | ||
| */ | ||
| const handleAgentCard = async (req: Request, res: Response, next: NextFunction) => { | ||
| try { | ||
| const { chatflowId } = req.params | ||
| logger.debug(`[A2A] AgentCard request for chatflow: ${chatflowId}`) | ||
| await a2aService.handleAgentCard(chatflowId, req, res) | ||
| } catch (error) { | ||
| next(error) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handle POST /api/v1/a2a/:chatflowId — A2A JSON-RPC messages. | ||
| * Handles: tasks/send, tasks/sendSubscribe, tasks/get, tasks/cancel, agent/card | ||
| */ | ||
| const handlePost = async (req: Request, res: Response, next: NextFunction) => { | ||
| try { | ||
| const { chatflowId } = req.params | ||
| logger.debug(`[A2A] JSON-RPC request for chatflow: ${chatflowId}`) | ||
| await a2aService.handleJsonRpc(chatflowId, req, res) | ||
| } catch (error) { | ||
| next(error) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Handle OPTIONS preflight requests for A2A endpoints. | ||
| */ | ||
| const handleOptions = async (req: Request, res: Response) => { | ||
| res.setHeader('Access-Control-Allow-Origin', '*') | ||
| res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') | ||
| res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization') | ||
| res.setHeader('Access-Control-Max-Age', '86400') | ||
| res.status(204).end() | ||
| } | ||
|
|
||
| export default { | ||
| authenticateToken, | ||
| handleAgentCard, | ||
| handlePost, | ||
| handleOptions, | ||
| getRateLimiterMiddleware | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -326,6 +326,28 @@ export class App { | |||||||||
|
|
||||||||||
| this.app.use('/api/v1', flowiseApiV1Router) | ||||||||||
|
|
||||||||||
| // ---------------------------------------- | ||||||||||
| // A2A Agent Discovery — well-known endpoint | ||||||||||
| // Returns AgentCard for a specific chatflow (requires chatflowId query param) | ||||||||||
| // ---------------------------------------- | ||||||||||
| this.app.get('/.well-known/agent-card.json', async (request, response) => { | ||||||||||
| try { | ||||||||||
| const chatflowId = request.query.chatflowId as string | ||||||||||
| if (!chatflowId) { | ||||||||||
| response.status(400).json({ | ||||||||||
| error: 'chatflowId query parameter is required. Example: /.well-known/agent-card.json?chatflowId=<id>' | ||||||||||
| }) | ||||||||||
| return | ||||||||||
| } | ||||||||||
| // Forward to the A2A agent card handler | ||||||||||
| const a2aController = (await import('./controllers/a2a')).default | ||||||||||
| const modifiedReq = { ...request, params: { chatflowId } } as any | ||||||||||
| await a2aController.handleAgentCard(modifiedReq, response, () => {}) | ||||||||||
|
Comment on lines
+344
to
+345
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shallow copying the Express
Suggested change
|
||||||||||
| } catch (error) { | ||||||||||
| response.status(500).json({ error: 'Failed to generate AgentCard' }) | ||||||||||
| } | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| // ---------------------------------------- | ||||||||||
| // Configure number of proxies in Host Environment | ||||||||||
| // ---------------------------------------- | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| import express from 'express' | ||
| import cors from 'cors' | ||
| import a2aController from '../../controllers/a2a' | ||
|
|
||
| const router = express.Router() | ||
|
|
||
| // Body size limit: 10MB max for A2A JSON-RPC payloads | ||
| router.use(express.json({ limit: '10mb', type: 'application/json' })) | ||
|
|
||
| // CORS: Allow all origins for A2A agent-to-agent communication | ||
| // Can be restricted via A2A_CORS_ORIGINS env var | ||
| const a2aCorsOrigins = process.env.A2A_CORS_ORIGINS | ||
| const a2aCorsOptions: cors.CorsOptions = { | ||
| origin: a2aCorsOrigins | ||
| ? a2aCorsOrigins === '*' | ||
| ? true | ||
| : a2aCorsOrigins.split(',').map((o) => o.trim()) | ||
| : true, | ||
| methods: ['GET', 'POST', 'OPTIONS'], | ||
| allowedHeaders: ['Content-Type', 'Authorization'], | ||
| maxAge: 86400 | ||
| } | ||
| router.use(cors(a2aCorsOptions)) | ||
|
|
||
| // Handle preflight for all A2A routes | ||
| router.options('/:chatflowId', a2aController.handleOptions) | ||
|
|
||
| // GET /.well-known/agent-card.json for a specific chatflow | ||
| // AgentCard discovery endpoint | ||
| router.get( | ||
| '/:chatflowId', | ||
| a2aController.authenticateToken, | ||
| a2aController.handleAgentCard | ||
| ) | ||
|
|
||
| // POST — A2A JSON-RPC messages (tasks/send, tasks/sendSubscribe, tasks/get, tasks/cancel) | ||
| router.post( | ||
| '/:chatflowId', | ||
| a2aController.getRateLimiterMiddleware, | ||
| a2aController.authenticateToken, | ||
| a2aController.handlePost | ||
| ) | ||
|
|
||
| export default router |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
getRateLimiterMiddlewareis unnecessarily marked asasyncand returns a Promise. Express middleware should be synchronous unless they perform asynchronous operations. Wrapping the rate limiter call in an async function is redundant.