Skip to content

refactor: extract event_loop_utils module and improve agent response handling#999

Merged
Wendong-Fan merged 38 commits intoeigent-ai:mainfrom
MkDev11:feature/streaming-agent-output
Feb 3, 2026
Merged

refactor: extract event_loop_utils module and improve agent response handling#999
Wendong-Fan merged 38 commits intoeigent-ai:mainfrom
MkDev11:feature/streaming-agent-output

Conversation

@MkDev11
Copy link
Copy Markdown
Contributor

@MkDev11 MkDev11 commented Jan 21, 2026

Summary

Refactors event loop utilities and agent streaming response handling for improved code organization and thread-safety.

Changes

New Module: event_loop_utils.py

  • Extracted set_main_event_loop() and _schedule_async_task() into a dedicated utility module
  • Provides thread-safe async task scheduling from worker threads using contextvars + global fallback

Refactored listen_chat_agent.py

  • Added reusable helper methods:
    • _send_agent_deactivate() - Sends agent deactivation events
    • _extract_tokens() - Extracts token count from responses
    • _stream_chunks() / _astream_chunks() - Wraps streaming responses for consistent handling
  • Fixed thread-safety: Uses _schedule_async_task instead of asyncio.create_task for toolkit activation/deactivation calls
  • Fixed AsyncStreamingChatAgentResponse handling to properly iterate through chunks

Improved single_agent_worker.py

  • Better streaming response handling with last_chunk tracking
  • Properly captures usage info from streaming responses after iteration

Cleanup

  • Removed duplicate code

@Wendong-Fan Wendong-Fan requested review from a7m-1st and fengju0213 and removed request for a7m-1st January 21, 2026 13:35
@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 21, 2026

@a7m-1st @fengju0213 could you please review the changes and let me know your feedback? thanks!

@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Jan 21, 2026

Thanks for the PR @MkDev11 , I don't think I can manage to test this by today (UTC +3).
But have you taken a look at #767 ? kind of similar.

But UI wise can you attach a screenshot of which part you are exactly streaming? Just to get a faster picture !

@MkDev11 MkDev11 closed this Jan 22, 2026
@MkDev11 MkDev11 force-pushed the feature/streaming-agent-output branch from d81f18c to 35f4ca2 Compare January 22, 2026 02:08
@MkDev11 MkDev11 reopened this Jan 22, 2026
The camel library throws 'AsyncChatCompletionStreamManager object has no
attribute choices' when streaming is enabled for agents with tools attached.

Changes:
- Disable streaming for all astep calls in ListenChatAgent to avoid the error
- Preserve model_config_dict when cloning agents (for future streaming support)
- Add streaming infrastructure (ActionStreamingAgentOutputData) for when
  camel library support is available
- Handle AsyncStreamingChatAgentResponse in single_agent_worker.py

Note: Streaming text output for worker agents is blocked by upstream camel
library limitation. Task decomposition streaming still works via decompose_text.
Shows real-time '[Tool] Shell Exec...', '[Tool] Write File...' etc. messages
in the UI while tools are executing, giving users feedback on agent activity.

Changes:
- Add ActionStreamingAgentOutputData events in @listen_toolkit decorator
- Add streaming events in _execute_tool and _aexecute_tool methods
- Disable model streaming for worker agents (camel library limitation)
- Replace emoji with [Tool] text for better compatibility

Closes eigent-ai#87
@MkDev11 MkDev11 changed the title feat: implement streaming output for agent responses feat: add streaming tool activity messages during task execution Jan 22, 2026
@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 22, 2026

@a7m-1st I added some changes and updated the description. This PR adds some real-time feedback while tasks are running.

Before, you'd just see the task start and then wait until it finished. Now you'll see messages like [Tool] Shell Exec... pop up as the agent works, so you know what's actually happening behind the scenes.

I originally wanted to stream the model's actual thinking text (like ChatGPT does), but hit a wall - the camel library we use crashes when you try to stream with tools enabled. Since pretty much every agent uses tools, I pivoted to showing tool activity instead. Not quite what issue #87 asked for, but it's still a nice improvement.

@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 22, 2026

streaming messages

@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Jan 22, 2026

I see awesome then Thanks @MkDev11, I need time so far as I am little occupied with other tasks. I will catch up with this PR once I got the chance.

@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 22, 2026 via email

@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Jan 22, 2026

Hi there @Wendong-Fan , I think this would be a cool feature maybe Douglas can confirm if the UI meets expectations?

@fengju0213
Copy link
Copy Markdown
Collaborator

@MkDev11 thanks for your contribution,i noticed that you mentioned that the camel library throws AsyncChatCompletionStreamManager errors when streaming with tools,however, Camel normally supports tool calls in streaming mode. Could I please tell you under what circumstances you encounter the error?

@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 23, 2026

@MkDev11 thanks for your contribution,i noticed that you mentioned that the camel library throws AsyncChatCompletionStreamManager errors when streaming with tools,however, Camel normally supports tool calls in streaming mode. Could I please tell you under what circumstances you encounter the error?

Hey @fengju0213, good catch - my comment in the code was a bit misleading. It's not streaming + tools that's the issue, it's streaming + response_format (structured output).

Here's what happens:

model = ModelFactory.create(
    model_platform=ModelPlatformType.OPENAI,
    model_type=ModelType.GPT_4O_MINI,
    model_config_dict={'stream': True}
)
agent = ChatAgent(system_message='...', model=model)

# this breaks
response = await agent.astep('query', response_format=SomePydanticModel)
async for chunk in response:
    print(chunk)

You get:

AttributeError: 'AsyncChatCompletionStreamManager' object has no attribute 'choices'

The problem is in _handle_batch_response (chat_agent.py:3743) - it tries to do for choice in response.choices but OpenAI returns an AsyncChatCompletionStreamManager when you combine streaming with structured output, and that object doesn't have .choices.

So I just disable streaming when we need structured output:

worker_agent.model_backend.model_config_dict["stream"] = False
response = await worker_agent.astep(prompt, response_format=TaskResult)

Let me know what you think

The issue is streaming + response_format (structured output), not streaming + tools
Includes fix for AsyncChatCompletionStreamManager from PR #3743
@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Jan 26, 2026

Hopfully we an resolve this in camel-ai/camel#3743

I believe we can simplify the configs now bcz it has been fixed @MkDev11 or are they unrelated? i.e. the manual setting of steam mode and on clone too

Remove specific mention of stream setting since the workaround
for AsyncChatCompletionStreamManager is no longer needed.
@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 26, 2026

Hopfully we an resolve this in camel-ai/camel#3743

I believe we can simplify the configs now bcz it has been fixed @MkDev11 or are they unrelated? i.e. the manual setting of steam mode and on clone too

yep, I've simplified it! Removed the workarounds from both agent.py and single_agent_worker.py, and updated camel-ai to 0.2.85 which includes the fix. The stream mode settings and clone preservation are intentional config, not workarounds, so those stay.

@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Jan 26, 2026

@a7m-1st please review the changes again

@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Jan 27, 2026

All right sure, let me run it through the debugger to reconfirm the flow.
Aside than this, if @Wendong-Fan and @Douglasymlai find it a cool feature, I think it should be good 👍

@MkDev11 MkDev11 force-pushed the feature/streaming-agent-output branch from 800cebd to 523ba97 Compare January 30, 2026 22:08
@MkDev11
Copy link
Copy Markdown
Contributor Author

MkDev11 commented Feb 1, 2026

hello @Wendong-Fan I am really sorry for tagging you, could you please give me any update for me?

@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Feb 3, 2026

Hi there @MkDev11 , I have just pushed my enhancements, apologies for the huge delays.

  • simplified the set_main_event_loop & _schedule_async_task to not be redundant
  • isolate functions to _send_streaming_chunk, _send_agent_deactivate, _extract_tokens & _stream_chunks

…s and isolated functions

- Remove streaming_agent_output SSE step and ActionStreamingAgentOutputData
- Remove _send_streaming_chunk method and [Tool] display calls
- Revert frontend files to upstream/main (no streaming UI changes)
- Keep refactored utilities: event_loop_utils, _schedule_async_task, _send_agent_deactivate, _extract_tokens, _stream_chunks
@MkDev11 MkDev11 changed the title feat: add streaming tool activity messages during task execution refactor: extract event_loop_utils module and improve agent response handling Feb 3, 2026
Copy link
Copy Markdown
Contributor

@Wendong-Fan Wendong-Fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup! The extraction of event loop utils and the streaming response improvements look good.

Found a couple of things:

  1. _send_agent_deactivate uses asyncio.create_task() but this method gets called from streaming generators' finally blocks which can run in worker threads. Should use _schedule_async_task() instead (same as _execute_tool does).

  2. In astep(), deactivation only fires when res is not None. On exceptions, res stays None so no deactivation is sent and the frontend thinks the agent is still running. The step() method handles this correctly by sending deactivation unconditionally for all non-streaming cases. Would be good to match that pattern here.

@Wendong-Fan
Copy link
Copy Markdown
Contributor

added commit f7ab0f8 based on review

Wendong-Fan and others added 2 commits February 3, 2026 22:01
Resolved conflicts in:
- app/agent/agent_model.py: Keep event_loop_utils import approach
- app/agent/listen_chat_agent.py: Keep helper methods and cleaner formatting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
@Wendong-Fan Wendong-Fan added this to the Sprint 13 milestone Feb 3, 2026
@Wendong-Fan Wendong-Fan merged commit 7c287fe into eigent-ai:main Feb 3, 2026
5 of 6 checks passed
@a7m-1st
Copy link
Copy Markdown
Collaborator

a7m-1st commented Feb 3, 2026

Thanks @Wendong-Fan for your review, and thanks @MkDev11 for the PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants