Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/a2a/server/tasks/base_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,26 @@ async def _dispatch_notification(
) -> bool:
url = push_info.url
try:
headers = None
headers = {}
if push_info.token:
headers = {'X-A2A-Notification-Token': push_info.token}
headers['X-A2A-Notification-Token'] = push_info.token

# Add authentication header if configured
if push_info.authentication and push_info.authentication.schemes:
for scheme in push_info.authentication.schemes:
if (
scheme.lower() == 'bearer'
and push_info.authentication.credentials
):
headers['Authorization'] = (
f'Bearer {push_info.authentication.credentials}'
)
break

response = await self._client.post(
url,
json=task.model_dump(mode='json', exclude_none=True),
headers=headers,
headers=headers if headers else None,
)
response.raise_for_status()
logger.info(
Expand Down
90 changes: 89 additions & 1 deletion tests/server/tasks/test_push_notification_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
BasePushNotificationSender,
)
from a2a.types import (
PushNotificationAuthenticationInfo,
PushNotificationConfig,
Task,
TaskState,
Expand All @@ -29,8 +30,11 @@ def create_sample_push_config(
url: str = 'http://example.com/callback',
config_id: str = 'cfg1',
token: str | None = None,
authentication: PushNotificationAuthenticationInfo | None = None,
) -> PushNotificationConfig:
return PushNotificationConfig(id=config_id, url=url, token=token)
return PushNotificationConfig(
id=config_id, url=url, token=token, authentication=authentication
)


class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -92,6 +96,90 @@ async def test_send_notification_with_token_success(self) -> None:
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_bearer_authentication(self) -> None:
task_id = 'task_send_bearer_auth'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials='test-jwt-token'
)
config = create_sample_push_config(
url='http://notify.me/here',
token='unique_token',
authentication=auth_info,
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

self.mock_config_store.get_info.assert_awaited_once_with(task_id)

# assert httpx_client post method got invoked with right parameters
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers={
'X-A2A-Notification-Token': 'unique_token',
'Authorization': 'Bearer test-jwt-token',
},
)
mock_response.raise_for_status.assert_called_once()

async def test_send_notification_with_bearer_authentication_no_credentials(
self,
) -> None:
task_id = 'task_send_bearer_no_creds'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Bearer'], credentials=None
)
config = create_sample_push_config(
url='http://notify.me/here', authentication=auth_info
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

# Should not add Authorization header when credentials are missing
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers=None,
)

async def test_send_notification_with_non_bearer_authentication(
self,
) -> None:
task_id = 'task_send_non_bearer'
task_data = create_sample_task(task_id=task_id)
auth_info = PushNotificationAuthenticationInfo(
schemes=['Basic'], credentials='user:pass'
)
config = create_sample_push_config(
url='http://notify.me/here', authentication=auth_info
)
self.mock_config_store.get_info.return_value = [config]

mock_response = AsyncMock(spec=httpx.Response)
mock_response.status_code = 200
self.mock_httpx_client.post.return_value = mock_response

await self.sender.send_notification(task_data)

# Should not add Authorization header for non-Bearer schemes
self.mock_httpx_client.post.assert_awaited_once_with(
config.url,
json=task_data.model_dump(mode='json', exclude_none=True),
headers=None,
)

async def test_send_notification_no_config(self) -> None:
task_id = 'task_send_no_config'
task_data = create_sample_task(task_id=task_id)
Expand Down
Loading