Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api/oss/src/apis/fastapi/webhooks/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import List, Optional, Union

from pydantic import BaseModel

Expand All @@ -24,6 +24,10 @@ class WebhookSubscriptionEditRequest(BaseModel):
subscription: WebhookSubscriptionEdit


class WebhookSubscriptionDraftTestRequest(BaseModel):
subscription: Union[WebhookSubscriptionEdit, WebhookSubscriptionCreate]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Pydantic Union always resolves to WebhookSubscriptionEdit, making WebhookSubscriptionCreate unreachable

In WebhookSubscriptionDraftTestRequest, the field subscription: Union[WebhookSubscriptionEdit, WebhookSubscriptionCreate] will always deserialize any payload as WebhookSubscriptionEdit, because all of its extra fields (from Identifier: id: Optional[UUID] = None, from Lifecycle: created_at: Optional[datetime] = None, etc.) have Optional defaults. Pydantic v2 tries left-to-right and WebhookSubscriptionEdit succeeds for every valid WebhookSubscriptionCreate payload, so the WebhookSubscriptionCreate branch is dead code.

In practice, the service code at api/oss/src/core/webhooks/service.py:178 uses getattr(subscription, "id", None) which returns None for create payloads (since Identifier.id defaults to None), so runtime behavior is correct by coincidence. However, this also means create-style draft test payloads now silently accept Lifecycle fields (created_at, deleted_at, deleted_by_id, etc.) that should not be part of a create request, weakening input validation.

Prompt for agents
In api/oss/src/apis/fastapi/webhooks/models.py line 28, the Union[WebhookSubscriptionEdit, WebhookSubscriptionCreate] is non-discriminable because WebhookSubscriptionEdit's extra fields (id, created_at, etc.) are all Optional with defaults. Any WebhookSubscriptionCreate payload also validates as WebhookSubscriptionEdit, so the Create branch is never reached.

To fix this, use a Pydantic discriminated union with an explicit discriminator field (e.g. a literal 'mode' field), or use a single model that has id as Optional and let the service decide behavior based on whether id is present (which is what the code already effectively does). The simplest fix may be to just use WebhookSubscriptionEdit directly (since that's what Pydantic always produces anyway) and document that id=None means 'create draft' while id=<uuid> means 'edit draft'.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.



class WebhookSubscriptionQueryRequest(BaseModel):
subscription: Optional[WebhookSubscriptionQuery] = None

Expand Down
41 changes: 41 additions & 0 deletions api/oss/src/apis/fastapi/webhooks/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from oss.src.apis.fastapi.webhooks.models import (
WebhookSubscriptionCreateRequest,
WebhookSubscriptionDraftTestRequest,
WebhookSubscriptionEditRequest,
WebhookSubscriptionQueryRequest,
WebhookSubscriptionResponse,
Expand Down Expand Up @@ -62,6 +63,15 @@ def __init__(
response_model_exclude_none=True,
status_code=status.HTTP_200_OK,
)
self.router.add_api_route(
"/subscriptions/test-draft",
self.test_draft_webhook,
methods=["POST"],
operation_id="test_webhook_draft",
response_model=WebhookDeliveryResponse,
response_model_exclude_none=True,
status_code=status.HTTP_200_OK,
)
self.router.add_api_route(
"/subscriptions/{subscription_id}",
self.fetch_subscription,
Expand Down Expand Up @@ -469,6 +479,37 @@ async def query_deliveries(

# --- WEBHOOK TESTS ------------------------------------------------------ #

@intercept_exceptions()
async def test_draft_webhook(
self,
request: Request,
*,
body: WebhookSubscriptionDraftTestRequest,
) -> WebhookDeliveryResponse:
if is_ee():
has_permission = await check_action_access(
user_uid=str(request.state.user_id),
project_id=str(request.state.project_id),
permission=Permission.EDIT_WEBHOOKS,
)
if not has_permission:
raise FORBIDDEN_EXCEPTION # type: ignore

try:
delivery = await self.webhooks_service.test_draft_webhook(
project_id=UUID(request.state.project_id),
subscription=body.subscription,
)
except WebhookAuthorizationSecretRequiredError as e:
raise HTTPException(status_code=400, detail=e.message) from e
except WebhookSubscriptionNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e)) from e

return WebhookDeliveryResponse(
count=1 if delivery else 0,
delivery=delivery,
)

@intercept_exceptions()
async def test_webhook(
self,
Expand Down
219 changes: 219 additions & 0 deletions api/oss/src/core/webhooks/delivery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import hashlib
import hmac
import json
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from uuid import UUID

import httpx

from agenta.sdk.workflows.handlers import resolve_json_selector

from oss.src.core.webhooks.types import (
EVENT_CONTEXT_FIELDS,
SUBSCRIPTION_CONTEXT_FIELDS,
WEBHOOK_TIMEOUT,
WebhookDeliveryData,
WebhookEventType,
)
from oss.src.core.webhooks.utils import validate_webhook_url
from oss.src.utils.crypting import decrypt
from oss.src.utils.logging import get_module_logger

log = get_module_logger(__name__)

MAX_RESOLVE_DEPTH = 10

NON_OVERRIDABLE_HEADERS = {
"content-type",
"content-length",
"host",
"user-agent",
"x-agenta-event-type",
"x-agenta-delivery-id",
"x-agenta-event-id",
"x-agenta-signature",
"idempotency-key",
"authorization",
}

REDACTED_HEADERS = {
"authorization",
"x-agenta-signature",
}

REDACTED_VALUE = "[REDACTED]"


@dataclass
class PreparedWebhookRequest:
typed_event_type: Optional[WebhookEventType]
data: WebhookDeliveryData
payload_json: str
request_headers: dict[str, str]


class PreparedWebhookRequestError(ValueError):
def __init__(self, message: str, *, data: WebhookDeliveryData):
super().__init__(message)
self.data = data


def _redact_headers(headers: dict[str, str]) -> dict[str, str]:
return {
key: (REDACTED_VALUE if key.lower() in REDACTED_HEADERS else value)
for key, value in headers.items()
}


def _merge_headers(
*,
user_headers: Optional[dict],
system_headers: dict[str, str],
) -> dict[str, str]:
merged: dict[str, str] = {}
dropped: list[str] = []

for key, value in (user_headers or {}).items():
key_str = str(key)
if key_str.lower() in NON_OVERRIDABLE_HEADERS:
dropped.append(key_str)
continue
merged[key_str] = str(value)

if dropped:
log.warning(
"[WEBHOOKS DELIVERY] Dropped non-overwritable user headers: %s",
", ".join(sorted(set(dropped))),
)

merged.update(system_headers)
return merged


def resolve_payload_fields(
fields: Any,
context: Dict[str, Any],
*,
_depth: int = 0,
) -> Any:
if _depth > MAX_RESOLVE_DEPTH:
return None
if isinstance(fields, dict):
return {
k: resolve_payload_fields(v, context, _depth=_depth + 1)
for k, v in fields.items()
}
if isinstance(fields, list):
return [
resolve_payload_fields(item, context, _depth=_depth + 1) for item in fields
]
try:
return resolve_json_selector(fields, context)
except Exception:
return None


def prepare_webhook_request(
*,
project_id: UUID,
delivery_id: UUID,
event_id: UUID,
event_type: str,
url: str,
headers: dict,
payload_fields: Optional[Dict[str, Any]],
auth_mode: Optional[str],
event: Dict[str, Any],
subscription: Dict[str, Any],
encrypted_secret: str,
) -> PreparedWebhookRequest:
try:
typed_event_type = WebhookEventType(event_type)
except ValueError:
log.warning(
"[WEBHOOKS DELIVERY] Unrecognized event_type %r — storing None in delivery data",
event_type,
)
typed_event_type = None

context = {
"event": {k: v for k, v in event.items() if k in EVENT_CONTEXT_FIELDS},
"subscription": {
k: v for k, v in subscription.items() if k in SUBSCRIPTION_CONTEXT_FIELDS
},
"scope": {"project_id": str(project_id)},
}

resolved_fields = payload_fields if payload_fields is not None else "$"
payload = resolve_payload_fields(resolved_fields, context)

base_data = WebhookDeliveryData(
event_type=typed_event_type,
url=url,
payload=payload,
)

try:
validate_webhook_url(url)
except ValueError as exc:
raise PreparedWebhookRequestError(str(exc), data=base_data) from exc

signing_secret = decrypt(encrypted_secret)
resolved_auth_mode = auth_mode or "signature"
payload_json = json.dumps(payload, sort_keys=True, separators=(",", ":"))
timestamp = str(int(datetime.now(timezone.utc).timestamp()))

if resolved_auth_mode == "authorization":
system_headers = {
"Content-Type": "application/json",
"User-Agent": "Agenta-Webhook/1.0",
"X-Agenta-Event-Type": event_type,
"X-Agenta-Delivery-Id": str(delivery_id),
"X-Agenta-Event-Id": str(event_id),
"Idempotency-Key": str(delivery_id),
"Authorization": signing_secret,
}
else:
to_sign = f"{timestamp}.{payload_json}"
signature = hmac.new(
key=signing_secret.encode("utf-8"),
msg=to_sign.encode("utf-8"),
digestmod=hashlib.sha256,
).hexdigest()
system_headers = {
"Content-Type": "application/json",
"User-Agent": "Agenta-Webhook/1.0",
"X-Agenta-Event-Type": event_type,
"X-Agenta-Delivery-Id": str(delivery_id),
"X-Agenta-Event-Id": str(event_id),
"Idempotency-Key": str(delivery_id),
"X-Agenta-Signature": f"t={timestamp},v1={signature}",
}

request_headers = _merge_headers(
user_headers=headers,
system_headers=system_headers,
)

return PreparedWebhookRequest(
typed_event_type=typed_event_type,
data=base_data.model_copy(update={"headers": _redact_headers(request_headers)}),
payload_json=payload_json,
request_headers=request_headers,
)


async def send_webhook_request(
*,
url: str,
payload_json: str,
headers: dict[str, str],
) -> httpx.Response:
async with httpx.AsyncClient(timeout=WEBHOOK_TIMEOUT) as client:
return await client.post(
url,
content=payload_json,
headers=headers,
)
Loading
Loading