Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions packages/server/src/controllers/a2a/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import { NextFunction, Request, Response } from 'express'
import a2aService from '../../services/a2a'
import { RateLimiterManager } from '../../utils/rateLimit'
import logger from '../../utils/logger'

/**
* Extract token from the Authorization: Bearer <token> header.
* Returns null if not present or malformed.
*/
function extractToken(req: Request): string | null {
const authHeader = req.headers.authorization
if (!authHeader || !authHeader.startsWith('Bearer ')) return null
const token = authHeader.slice(7).trim()
return token.length > 0 ? token : null
}

/**
* Authentication middleware — validates Bearer token and attaches it to res.locals.
* If A2A_AUTH_DISABLED env var is set to 'true', authentication is bypassed.
*/
const authenticateToken = (req: Request, res: Response, next: NextFunction) => {
if (process.env.A2A_AUTH_DISABLED === 'true') {
next()
return
}

const token = extractToken(req)
if (!token) {
res.status(401).json({
jsonrpc: '2.0',
error: { code: -32001, message: 'Unauthorized: missing or invalid Authorization header. Use Bearer <token>.' },
id: null
})
return
}
res.locals.token = token
next()
}

/**
* Rate limiter middleware for A2A endpoint.
*/
const getRateLimiterMiddleware = async (req: Request, res: Response, next: NextFunction) => {
try {
return RateLimiterManager.getInstance().getRateLimiter()(req, res, next)
} catch (error) {
next(error)
}
}
Comment on lines +43 to +49
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The getRateLimiterMiddleware is unnecessarily marked as async and returns a Promise. Express middleware should be synchronous unless they perform asynchronous operations. Wrapping the rate limiter call in an async function is redundant.

Suggested change
const getRateLimiterMiddleware = async (req: Request, res: Response, next: NextFunction) => {
try {
return RateLimiterManager.getInstance().getRateLimiter()(req, res, next)
} catch (error) {
next(error)
}
}
const getRateLimiterMiddleware = (req: Request, res: Response, next: NextFunction) => {
try {
RateLimiterManager.getInstance().getRateLimiter()(req, res, next)
} catch (error) {
next(error)
}
}


/**
* Handle GET /.well-known/agent-card.json for a specific chatflow.
* This is the A2A agent discovery endpoint.
*/
const handleAgentCard = async (req: Request, res: Response, next: NextFunction) => {
try {
const { chatflowId } = req.params
logger.debug(`[A2A] AgentCard request for chatflow: ${chatflowId}`)
await a2aService.handleAgentCard(chatflowId, req, res)
} catch (error) {
next(error)
}
}

/**
* Handle POST /api/v1/a2a/:chatflowId — A2A JSON-RPC messages.
* Handles: tasks/send, tasks/sendSubscribe, tasks/get, tasks/cancel, agent/card
*/
const handlePost = async (req: Request, res: Response, next: NextFunction) => {
try {
const { chatflowId } = req.params
logger.debug(`[A2A] JSON-RPC request for chatflow: ${chatflowId}`)
await a2aService.handleJsonRpc(chatflowId, req, res)
} catch (error) {
next(error)
}
}

/**
* Handle OPTIONS preflight requests for A2A endpoints.
*/
const handleOptions = async (req: Request, res: Response) => {
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization')
res.setHeader('Access-Control-Max-Age', '86400')
res.status(204).end()
}

export default {
authenticateToken,
handleAgentCard,
handlePost,
handleOptions,
getRateLimiterMiddleware
}
22 changes: 22 additions & 0 deletions packages/server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,28 @@ export class App {

this.app.use('/api/v1', flowiseApiV1Router)

// ----------------------------------------
// A2A Agent Discovery — well-known endpoint
// Returns AgentCard for a specific chatflow (requires chatflowId query param)
// ----------------------------------------
this.app.get('/.well-known/agent-card.json', async (request, response) => {
try {
const chatflowId = request.query.chatflowId as string
if (!chatflowId) {
response.status(400).json({
error: 'chatflowId query parameter is required. Example: /.well-known/agent-card.json?chatflowId=<id>'
})
return
}
// Forward to the A2A agent card handler
const a2aController = (await import('./controllers/a2a')).default
const modifiedReq = { ...request, params: { chatflowId } } as any
await a2aController.handleAgentCard(modifiedReq, response, () => {})
Comment on lines +344 to +345
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Shallow copying the Express request object using { ...request } breaks the prototype chain, meaning helper methods like req.get() are lost. This will cause a runtime crash (TypeError: req.get is not a function) when handleAgentCard tries to determine the protocol. Instead, mutate request.params directly to preserve the prototype chain.

Suggested change
const modifiedReq = { ...request, params: { chatflowId } } as any
await a2aController.handleAgentCard(modifiedReq, response, () => {})
request.params = { ...request.params, chatflowId }
await a2aController.handleAgentCard(request, response, () => {})

} catch (error) {
response.status(500).json({ error: 'Failed to generate AgentCard' })
}
})

// ----------------------------------------
// Configure number of proxies in Host Environment
// ----------------------------------------
Expand Down
44 changes: 44 additions & 0 deletions packages/server/src/routes/a2a/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import express from 'express'
import cors from 'cors'
import a2aController from '../../controllers/a2a'

const router = express.Router()

// Body size limit: 10MB max for A2A JSON-RPC payloads
router.use(express.json({ limit: '10mb', type: 'application/json' }))

// CORS: Allow all origins for A2A agent-to-agent communication
// Can be restricted via A2A_CORS_ORIGINS env var
const a2aCorsOrigins = process.env.A2A_CORS_ORIGINS
const a2aCorsOptions: cors.CorsOptions = {
origin: a2aCorsOrigins
? a2aCorsOrigins === '*'
? true
: a2aCorsOrigins.split(',').map((o) => o.trim())
: true,
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization'],
maxAge: 86400
}
router.use(cors(a2aCorsOptions))

// Handle preflight for all A2A routes
router.options('/:chatflowId', a2aController.handleOptions)

// GET /.well-known/agent-card.json for a specific chatflow
// AgentCard discovery endpoint
router.get(
'/:chatflowId',
a2aController.authenticateToken,
a2aController.handleAgentCard
)

// POST — A2A JSON-RPC messages (tasks/send, tasks/sendSubscribe, tasks/get, tasks/cancel)
router.post(
'/:chatflowId',
a2aController.getRateLimiterMiddleware,
a2aController.authenticateToken,
a2aController.handlePost
)

export default router
2 changes: 2 additions & 0 deletions packages/server/src/routes/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import express from 'express'
import a2aRouter from './a2a'
import agentflowv2GeneratorRouter from './agentflowv2-generator'
import apikeyRouter from './apikey'
import assistantsRouter from './assistants'
Expand Down Expand Up @@ -133,6 +134,7 @@ router.use('/text-to-speech', textToSpeechRouter)
router.use('/custom-mcp-servers', customMcpServersRouter)
router.use('/mcp-server', mcpServerRouter)
router.use('/mcp', mcpEndpointRouter)
router.use('/a2a', a2aRouter)

router.use('/auth', authRouter)
router.use('/audit', IdentityManager.checkFeatureByPlan('feat:login-activity'), auditRouter)
Expand Down
Loading