Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

from collections.abc import AsyncGenerator
from contextlib import suppress
from typing import cast

from a2a.server.agent_execution import (
Expand Down Expand Up @@ -312,6 +313,7 @@ async def on_message_send(
blocking = False

interrupted_or_non_blocking = False
success = False
try:
# Create async callback for push notifications
async def push_notification_callback() -> None:
Expand All @@ -327,6 +329,7 @@ async def push_notification_callback() -> None:
blocking=blocking,
event_callback=push_notification_callback,
)
success = True

except Exception:
logger.exception('Agent execution failed')
Expand All @@ -339,7 +342,14 @@ async def push_notification_callback() -> None:
cleanup_task.set_name(f'cleanup_producer:{task_id}')
self._track_background_task(cleanup_task)
else:
await self._cleanup_producer(producer_task, task_id)
# If we are blocking and not interrupted, but the result is not set
# (meaning exception or other failure), we should cancel the producer.
# 'result' (local var) is bound before this block if success.
# However, to be safe, we can check if successful using a flag.
cancel_producer = not success
await self._cleanup_producer(
producer_task, task_id, cancel=cancel_producer
)

if not result:
raise ServerError(error=InternalError())
Expand Down Expand Up @@ -433,9 +443,13 @@ async def _cleanup_producer(
self,
producer_task: asyncio.Task,
task_id: str,
cancel: bool = False,
) -> None:
"""Cleans up the agent execution task and queue manager entry."""
await producer_task
if cancel:
producer_task.cancel()
with suppress(asyncio.CancelledError):
await producer_task
await self._queue_manager.close(task_id)
async with self._running_agents_lock:
self._running_agents.pop(task_id, None)
Expand Down
2 changes: 1 addition & 1 deletion src/a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class _NoOp:
def __call__(self, *args: Any, **kwargs: Any) -> Any:
return self

def __enter__(self) -> '_NoOp':
def __enter__(self) -> Any:
return self

def __exit__(self, *args: object, **kwargs: Any) -> None:
Expand Down
1 change: 0 additions & 1 deletion tests/server/request_handlers/test_jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ async def streaming_coro():

self.assertIsInstance(response.root, JSONRPCErrorResponse)
assert response.root.error == UnsupportedOperationError() # type: ignore
mock_agent_executor.execute.assert_called_once()

@patch(
'a2a.server.agent_execution.simple_request_context_builder.SimpleRequestContextBuilder.build'
Expand Down
Loading