refactor: extract event_loop_utils module and improve agent response handling#999
Conversation
|
@a7m-1st @fengju0213 could you please review the changes and let me know your feedback? thanks! |
d81f18c to
35f4ca2
Compare
The camel library throws 'AsyncChatCompletionStreamManager object has no attribute choices' when streaming is enabled for agents with tools attached. Changes: - Disable streaming for all astep calls in ListenChatAgent to avoid the error - Preserve model_config_dict when cloning agents (for future streaming support) - Add streaming infrastructure (ActionStreamingAgentOutputData) for when camel library support is available - Handle AsyncStreamingChatAgentResponse in single_agent_worker.py Note: Streaming text output for worker agents is blocked by upstream camel library limitation. Task decomposition streaming still works via decompose_text.
Shows real-time '[Tool] Shell Exec...', '[Tool] Write File...' etc. messages in the UI while tools are executing, giving users feedback on agent activity. Changes: - Add ActionStreamingAgentOutputData events in @listen_toolkit decorator - Add streaming events in _execute_tool and _aexecute_tool methods - Disable model streaming for worker agents (camel library limitation) - Replace emoji with [Tool] text for better compatibility Closes eigent-ai#87
|
@a7m-1st I added some changes and updated the description. This PR adds some real-time feedback while tasks are running. Before, you'd just see the task start and then wait until it finished. Now you'll see messages like I originally wanted to stream the model's actual thinking text (like |
|
I see awesome then Thanks @MkDev11, I need time so far as I am little occupied with other tasks. I will catch up with this PR once I got the chance. |
|
Cool!
…On Thu, Jan 22, 2026 at 11:37 AM Ahmed Awelkair A ***@***.***> wrote:
*a7m-1st* left a comment (eigent-ai/eigent#999)
<#999 (comment)>
I see awesome then Thanks @MkDev11 <https://github.com/MkDev11>, I need
time so far as I am little occupied with other tasks. I will catch up with
this PR once I got the chance.
—
Reply to this email directly, view it on GitHub
<#999 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AWOUTY6VPENCFY57A6CZXZT4ID4DVAVCNFSM6AAAAACSM4HTTSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTOOBVGM4DQMRSGM>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
|
Hi there @Wendong-Fan , I think this would be a cool feature maybe Douglas can confirm if the UI meets expectations? |
|
@MkDev11 thanks for your contribution,i noticed that you mentioned that the camel library throws AsyncChatCompletionStreamManager errors when streaming with tools,however, Camel normally supports tool calls in streaming mode. Could I please tell you under what circumstances you encounter the error? |
Hey @fengju0213, good catch - my comment in the code was a bit misleading. It's not streaming + tools that's the issue, it's streaming + Here's what happens: model = ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O_MINI,
model_config_dict={'stream': True}
)
agent = ChatAgent(system_message='...', model=model)
# this breaks
response = await agent.astep('query', response_format=SomePydanticModel)
async for chunk in response:
print(chunk)You get: The problem is in So I just disable streaming when we need structured output: worker_agent.model_backend.model_config_dict["stream"] = False
response = await worker_agent.astep(prompt, response_format=TaskResult)Let me know what you think |
The issue is streaming + response_format (structured output), not streaming + tools
Includes fix for AsyncChatCompletionStreamManager from PR #3743
I believe we can simplify the configs now bcz it has been fixed @MkDev11 or are they unrelated? i.e. the manual setting of steam mode and on clone too |
Remove specific mention of stream setting since the workaround for AsyncChatCompletionStreamManager is no longer needed.
yep, I've simplified it! Removed the workarounds from both |
|
@a7m-1st please review the changes again |
|
All right sure, let me run it through the debugger to reconfirm the flow. |
800cebd to
523ba97
Compare
|
hello @Wendong-Fan I am really sorry for tagging you, could you please give me any update for me? |
|
Hi there @MkDev11 , I have just pushed my enhancements, apologies for the huge delays.
|
…s and isolated functions - Remove streaming_agent_output SSE step and ActionStreamingAgentOutputData - Remove _send_streaming_chunk method and [Tool] display calls - Revert frontend files to upstream/main (no streaming UI changes) - Keep refactored utilities: event_loop_utils, _schedule_async_task, _send_agent_deactivate, _extract_tokens, _stream_chunks
Wendong-Fan
left a comment
There was a problem hiding this comment.
Thanks for the cleanup! The extraction of event loop utils and the streaming response improvements look good.
Found a couple of things:
-
_send_agent_deactivateusesasyncio.create_task()but this method gets called from streaming generators'finallyblocks which can run in worker threads. Should use_schedule_async_task()instead (same as_execute_tooldoes). -
In
astep(), deactivation only fires whenres is not None. On exceptions,resstaysNoneso no deactivation is sent and the frontend thinks the agent is still running. Thestep()method handles this correctly by sending deactivation unconditionally for all non-streaming cases. Would be good to match that pattern here.
|
added commit f7ab0f8 based on review |
Resolved conflicts in: - app/agent/agent_model.py: Keep event_loop_utils import approach - app/agent/listen_chat_agent.py: Keep helper methods and cleaner formatting 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
|
Thanks @Wendong-Fan for your review, and thanks @MkDev11 for the PR |

Summary
Refactors event loop utilities and agent streaming response handling for improved code organization and thread-safety.
Changes
New Module:
event_loop_utils.pyset_main_event_loop()and_schedule_async_task()into a dedicated utility moduleRefactored listen_chat_agent.py
_schedule_async_taskinstead ofasyncio.create_taskfor toolkit activation/deactivation callsAsyncStreamingChatAgentResponsehandling to properly iterate through chunksImproved
single_agent_worker.pylast_chunktrackingCleanup