feat: add Google A2A (Agent2Agent) protocol support#6484
Conversation
Implements A2A server endpoint exposing Flowise chatflows as A2A-compatible agents with AgentCard discovery. Closes FlowiseAI#4283
There was a problem hiding this comment.
Code Review
This pull request implements Google A2A (Agent-to-Agent) protocol support, adding discovery endpoints, JSON-RPC message handlers, and task management (including streaming via SSE). Key feedback includes fixing a critical runtime crash caused by shallow copying the Express request object, addressing a potential memory leak from an unbounded in-memory task map, removing unused dead code (taskSSEClients), removing redundant async qualifiers on rate-limiting middleware, and validating chatflowId as a UUID to prevent database query failures.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| const modifiedReq = { ...request, params: { chatflowId } } as any | ||
| await a2aController.handleAgentCard(modifiedReq, response, () => {}) |
There was a problem hiding this comment.
Shallow copying the Express request object using { ...request } breaks the prototype chain, meaning helper methods like req.get() are lost. This will cause a runtime crash (TypeError: req.get is not a function) when handleAgentCard tries to determine the protocol. Instead, mutate request.params directly to preserve the prototype chain.
| const modifiedReq = { ...request, params: { chatflowId } } as any | |
| await a2aController.handleAgentCard(modifiedReq, response, () => {}) | |
| request.params = { ...request.params, chatflowId } | |
| await a2aController.handleAgentCard(request, response, () => {}) |
| * Map of active A2A tasks. | ||
| * In production, this should use a persistent store. | ||
| */ | ||
| const activeTasks = new Map<string, Task>() |
There was a problem hiding this comment.
activeTasks is an unbounded in-memory Map that stores task states indefinitely. Because there is no eviction policy, TTL, or cleanup mechanism, this will cause a memory leak over time as more tasks are processed, eventually leading to Out Of Memory (OOM) crashes. Consider using a TTL cache or a periodic cleanup interval to prune old tasks.
| /** | ||
| * Map of task IDs to their SSE response objects. | ||
| */ | ||
| const taskSSEClients = new Map<string, Response>() |
| const getRateLimiterMiddleware = async (req: Request, res: Response, next: NextFunction) => { | ||
| try { | ||
| return RateLimiterManager.getInstance().getRateLimiter()(req, res, next) | ||
| } catch (error) { | ||
| next(error) | ||
| } | ||
| } |
There was a problem hiding this comment.
The getRateLimiterMiddleware is unnecessarily marked as async and returns a Promise. Express middleware should be synchronous unless they perform asynchronous operations. Wrapping the rate limiter call in an async function is redundant.
| const getRateLimiterMiddleware = async (req: Request, res: Response, next: NextFunction) => { | |
| try { | |
| return RateLimiterManager.getInstance().getRateLimiter()(req, res, next) | |
| } catch (error) { | |
| next(error) | |
| } | |
| } | |
| const getRateLimiterMiddleware = (req: Request, res: Response, next: NextFunction) => { | |
| try { | |
| RateLimiterManager.getInstance().getRateLimiter()(req, res, next) | |
| } catch (error) { | |
| next(error) | |
| } | |
| } |
| async function handleAgentCard(chatflowId: string, req: Request, res: Response): Promise<void> { | ||
| try { | ||
| const appServer = getRunningExpressApp() | ||
| const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowId }) |
There was a problem hiding this comment.
The chatflowId parameter is passed directly to findOneBy({ id: chatflowId }) without validation. If the database is PostgreSQL and chatflowId is not a valid UUID format, the query will throw a fatal QueryFailedError (invalid input syntax for type uuid), resulting in an unhandled 500 error instead of a clean 400/404 response. Validate the UUID format before querying.
| async function handleAgentCard(chatflowId: string, req: Request, res: Response): Promise<void> { | |
| try { | |
| const appServer = getRunningExpressApp() | |
| const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowId }) | |
| async function handleAgentCard(chatflowId: string, req: Request, res: Response): Promise<void> { | |
| const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i | |
| if (!uuidRegex.test(chatflowId)) { | |
| res.status(StatusCodes.BAD_REQUEST).json({ | |
| error: 'Invalid chatflowId format: ' + chatflowId | |
| }) | |
| return | |
| } | |
| try { | |
| const appServer = getRunningExpressApp() | |
| const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowId }) |
Implements A2A protocol server for exposing Flowise agents.
Closes #4283