Skip to content

feat(auth): Arquitetura de autenticação para agents federados #2

@vavasilva

Description

@vavasilva

Contexto

Com a adoção do MCP Context Forge como gateway central para federação de agents, precisamos definir e implementar a arquitetura de autenticação que suporte:

  1. Autenticação do usuário na borda (Context Forge)
  2. Propagação de identidade para os agents especialistas
  3. Compatibilidade com agents que também funcionam standalone

Arquitetura Proposta

┌────────────────────────────────────────────────────────────────────────────┐
│                           FLUXO DE AUTENTICAÇÃO                            │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                            │
│  ┌─────────┐      OAuth 2.1        ┌──────────────────────────────────┐   │
│  │ Cliente │  (Cognito + Fluig)    │       MCP Context Forge          │   │
│  │         ├──────────────────────►│                                  │   │
│  └─────────┘                       │  1. Valida JWT Cognito           │   │
│                                    │  2. Busca permissões (TCloud API)│   │
│                                    │  3. Cacheia no Redis             │   │
│                                    │  4. Propaga headers p/ agents    │   │
│                                    └───────────────┬──────────────────┘   │
│                                                    │                      │
│                              Headers propagados:   │                      │
│                              X-User-Email          │                      │
│                              X-User-Customers      │                      │
│                                                    ▼                      │
│                                    ┌──────────────────────────────────┐   │
│                                    │      Agent Orquestrador          │   │
│                                    └───────────────┬──────────────────┘   │
│                                                    │                      │
│                    ┌───────────────┬───────────────┴───────────────┐      │
│                    ▼               ▼                               ▼      │
│              ┌──────────┐   ┌──────────┐                    ┌──────────┐  │
│              │ CPU/RAM  │   │    DB    │        ...         │   App    │  │
│              │  Agent   │   │  Agent   │                    │  Agent   │  │
│              └──────────┘   └──────────┘                    └──────────┘  │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Solução: Plugin de Autenticação para Context Forge

O Context Forge suporta plugins de autenticação via hook http_auth_resolve_user. Vamos criar um plugin customizado.

Estrutura do Plugin

plugins/tcloud_cognito_auth/
├── __init__.py
├── plugin-manifest.yaml
├── tcloud_cognito_auth.py    # Plugin principal
├── cognito.py                # Validação JWT Cognito
├── tcloud_api.py             # Client TCloud API
├── tests/
│   └── test_plugin.py
└── README.md

Fluxo do Plugin

┌─────────────────────────────────────────────────────────────────────────────┐
│                      Plugin: TCloudCognitoAuthPlugin                        │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  Hook: HTTP_AUTH_RESOLVE_USER                                               │
│                                                                             │
│  1. Extrai Bearer token do header Authorization                             │
│  2. Valida JWT com Cognito JWKS                                             │
│     - Verifica issuer (Cognito User Pool)                                   │
│     - Verifica expiração                                                    │
│     - Verifica client_id                                                    │
│  3. Extrai email do usuário (do username ou claim)                          │
│  4. Busca permissões na TCloud API (com cache Redis)                        │
│     - GET /customer → lista de cloud_ids                                    │
│     - Cache TTL: 5 minutos                                                  │
│  5. Retorna user dict + metadata com permissões                             │
│                                                                             │
│  Resultado:                                                                 │
│  - modified_payload: {email, full_name, is_admin, is_active}               │
│  - metadata: {auth_method: "cognito", customers: [...]}                    │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Código do Plugin

# plugins/tcloud_cognito_auth/tcloud_cognito_auth.py

from mcpgateway.plugins.framework import (
    Plugin, PluginConfig, PluginContext, PluginResult,
    HttpAuthResolveUserPayload, HttpHeaderPayload,
    PluginViolation, PluginViolationError,
)
import aiohttp
import jwt
from datetime import datetime, timezone

class TCloudCognitoAuthPlugin(Plugin):
    """Authenticate users via Cognito + TCloud API permissions."""
    
    def __init__(self, config: PluginConfig):
        super().__init__(config)
        self.cognito_region = config.config.get("cognito_region")
        self.cognito_user_pool_id = config.config.get("cognito_user_pool_id")
        self.cognito_client_id = config.config.get("cognito_client_id")
        self.tcloud_api_url = config.config.get("tcloud_api_url")
        self.tcloud_api_key = config.config.get("tcloud_api_key")
        self._jwks_cache = None
        self._session = None
    
    async def initialize(self):
        """Called when plugin is loaded."""
        self._session = aiohttp.ClientSession()
        await self._get_cognito_jwks()  # Pre-fetch JWKS
    
    async def shutdown(self):
        """Called when plugin is unloaded."""
        if self._session:
            await self._session.close()
    
    async def http_auth_resolve_user(
        self,
        payload: HttpAuthResolveUserPayload,
        context: PluginContext
    ) -> PluginResult[dict]:
        """Authenticate via Cognito JWT and fetch TCloud permissions."""
        
        if not payload.credentials:
            return PluginResult(continue_processing=True)
        
        token = payload.credentials.get("credentials")
        if not token:
            return PluginResult(continue_processing=True)
        
        try:
            # 1. Validate JWT with Cognito
            claims = await self._validate_cognito_jwt(token)
            email = self._extract_email(claims)
            
            # 2. Fetch TCloud permissions (with Redis cache)
            customers = await self._get_user_customers(token, context)
            
            self.logger.info(f"Authenticated user {email} with {len(customers)} customers")
            
            # 3. Return authenticated user with permissions
            return PluginResult(
                modified_payload={
                    "email": email,
                    "full_name": claims.get("name", email),
                    "is_admin": False,
                    "is_active": True,
                },
                metadata={
                    "auth_method": "cognito",
                    "customers": customers,
                    "cognito_sub": claims.get("sub"),
                },
                continue_processing=True,
            )
            
        except jwt.ExpiredSignatureError:
            raise PluginViolationError(
                message="Token expired",
                violation=PluginViolation(reason="Token expired", code="TOKEN_EXPIRED")
            )
        except jwt.InvalidTokenError as e:
            raise PluginViolationError(
                message=f"Invalid token: {e}",
                violation=PluginViolation(reason="Invalid token", code="INVALID_TOKEN")
            )
        except Exception as e:
            self.logger.error(f"Auth error: {e}")
            return PluginResult(continue_processing=True)
    
    async def _validate_cognito_jwt(self, token: str) -> dict:
        """Validate JWT against Cognito JWKS."""
        jwks = await self._get_cognito_jwks()
        header = jwt.get_unverified_header(token)
        kid = header.get("kid")
        
        key = None
        for k in jwks["keys"]:
            if k["kid"] == kid:
                key = jwt.algorithms.RSAAlgorithm.from_jwk(k)
                break
        
        if not key:
            raise jwt.InvalidTokenError("Key not found in JWKS")
        
        issuer = f"https://cognito-idp.{self.cognito_region}.amazonaws.com/{self.cognito_user_pool_id}"
        
        claims = jwt.decode(
            token, key, algorithms=["RS256"],
            issuer=issuer, options={"verify_aud": False}
        )
        
        # Verify client_id for access tokens
        if claims.get("token_use") == "access":
            if claims.get("client_id") != self.cognito_client_id:
                raise jwt.InvalidTokenError("Invalid client_id")
        
        return claims
    
    async def _get_cognito_jwks(self) -> dict:
        """Fetch and cache Cognito JWKS."""
        if self._jwks_cache:
            return self._jwks_cache
        
        url = f"https://cognito-idp.{self.cognito_region}.amazonaws.com/{self.cognito_user_pool_id}/.well-known/jwks.json"
        async with self._session.get(url) as resp:
            self._jwks_cache = await resp.json()
            return self._jwks_cache
    
    async def _get_user_customers(self, token: str, context: PluginContext) -> list:
        """Fetch user's customers from TCloud API with Redis cache."""
        import hashlib
        cache_key = f"tcloud:customers:{hashlib.sha256(token.encode()).hexdigest()[:16]}"
        
        # Try cache (via Context Forge Redis)
        try:
            from mcpgateway.cache import redis_cache
            cached = await redis_cache.get(cache_key)
            if cached:
                return cached
        except Exception:
            pass
        
        # Fetch from TCloud API
        headers = {
            "Authorization": f"Bearer {token}",
            "x-api-key": self.tcloud_api_key,
        }
        
        async with self._session.get(f"{self.tcloud_api_url}/customer", headers=headers) as resp:
            if resp.status != 200:
                self.logger.warning(f"TCloud API error: {resp.status}")
                return []
            
            data = await resp.json()
            customers = [c.get("cloud_id") for c in data if c.get("cloud_id")]
            
            # Cache for 5 minutes
            try:
                from mcpgateway.cache import redis_cache
                await redis_cache.set(cache_key, customers, ttl=300)
            except Exception:
                pass
            
            return customers
    
    def _extract_email(self, claims: dict) -> str:
        """Extract email from Cognito claims."""
        if claims.get("token_use") == "access":
            username = claims.get("username", "")
            return username.split("_", 1)[1] if "_" in username else username
        return claims.get("email", claims.get("sub"))

Configuração do Plugin

# infrastructure/context-forge/plugins/config.yaml
plugins:
  - name: "TCloudCognitoAuthPlugin"
    kind: "plugins.tcloud_cognito_auth.tcloud_cognito_auth.TCloudCognitoAuthPlugin"
    description: "Authenticate via Cognito + TCloud API permissions"
    version: "1.0.0"
    author: "TCloud Team"
    hooks: ["http_auth_resolve_user"]
    mode: "enforce"
    priority: 10
    config:
      cognito_region: "sa-east-1"
      cognito_user_pool_id: "${COGNITO_USER_POOL_ID}"
      cognito_client_id: "${COGNITO_APP_CLIENT_ID}"
      tcloud_api_url: "${TCLOUD_API_URL}"
      tcloud_api_key: "${TCLOUD_API_KEY}"

Helm Values para Secrets

# infrastructure/context-forge/values-dev.yaml
mcpContextForge:
  env:
    # Cognito
    COGNITO_REGION: "sa-east-1"
    COGNITO_USER_POOL_ID: "sa-east-1_xxx"
    COGNITO_APP_CLIENT_ID: "xxx"
    # TCloud API
    TCLOUD_API_URL: "https://api.tcloud.cloudtotvs.com.br/dev"
  
  # Secrets via Kubernetes Secret
  secretEnv:
    - name: TCLOUD_API_KEY
      secretName: tcloud-api-credentials
      secretKey: api-key
  
  # Plugin config
  plugins:
    enabled: true
    configFile: /app/plugins/config.yaml

Header Propagation para Agents

O Context Forge pode propagar headers para os agents downstream. Usar o plugin header_injector:

# plugins/config.yaml (adicional)
plugins:
  # ... TCloudCognitoAuthPlugin ...
  
  - name: "TCloudHeaderInjector"
    kind: "plugins.header_injector.header_injector.HeaderInjectorPlugin"
    hooks: ["tool_pre_invoke", "agent_pre_invoke"]
    priority: 20
    config:
      inject_from_auth_metadata: true
      headers:
        X-User-Email: "${auth.email}"
        X-User-Customers: "${auth.metadata.customers}"

Headers Padronizados

Header Descrição Exemplo
X-User-Email Email do usuário autenticado user@totvs.com.br
X-User-Customers Lista de cloud_ids permitidos (JSON) ["cloud_123", "cloud_456"]
X-Request-ID ID único para tracing uuid

Impacto nos Agents

Os agents precisam suportar dual-mode authentication:

  1. Via Context Forge: Lê permissões dos headers X-User-*
  2. Standalone: Valida OAuth e busca permissões na TCloud API

Ver: tcloud-dev/tcloud-watch-mcp-server#2

Tarefas

  • Criar estrutura do plugin em plugins/tcloud_cognito_auth/
  • Implementar validação JWT Cognito
  • Implementar integração com TCloud API
  • Configurar cache Redis para permissões
  • Criar testes unitários
  • Atualizar Helm values com novas env vars
  • Criar Kubernetes Secret para TCLOUD_API_KEY
  • Testar fluxo end-to-end com tcloud-watch-mcp-server
  • Documentar em docs/authentication.md

Referências

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions