Streaming events are experimental. The API may change in future releases.
Streaming events are incremental, provider-agnostic notifications emitted during an agent run. They let you update UIs in real time, log progress, or drive streaming transports without depending on vendor-specific chunk formats.
Events are emitted by the agent and transport layers — for example, AgentEventsMiddleware and the HTTP/SSE transport — throughout the run lifecycle.
Event types
All events share a type field. Use match event.type in Python to handle them.
Text message events
| Event | Key fields | Description |
|---|
TEXT_MESSAGE_START | run_id, message_id | Start of a text message |
TEXT_MESSAGE_CONTENT | message_id, delta | Incremental text fragment |
TEXT_MESSAGE_END | message_id | End of the message |
Thinking / reasoning events
Emitted by reasoning models such as o1 and DeepSeek-R1.
| Event | Key fields | Description |
|---|
THINKING_TEXT_MESSAGE_START | run_id, parent_message_id, thinking_message_id | Start of reasoning output |
THINKING_TEXT_MESSAGE_CONTENT | thinking_message_id, delta | Incremental reasoning fragment |
THINKING_TEXT_MESSAGE_END | thinking_message_id | End of reasoning output |
| Event | Key fields | Description |
|---|
TOOL_CALL_START | run_id, tool_call_id, parent_message_id, name | Start of a tool call |
TOOL_CALL_ARGS | tool_call_id, delta | Incremental argument JSON |
TOOL_CALL_END | tool_call_id | End of argument streaming |
TOOL_CALL_RESULT | tool_call_id, content, role | Result after tool execution |
Run lifecycle events
| Event | Key fields | Description |
|---|
RUN_STARTED | agent_id, run_id, root_run_id, timestamp | Run started |
RUN_FINISHED | run_id, result, timestamp | Run completed successfully |
RUN_ERROR | run_id, message, timestamp | Run failed |
TRANSPORT_ERROR | message, timestamp | Transport-level error |
Handler pattern
Events are Pydantic models from nexau.archs.llm.llm_aggregators.events. Match on event.type:
from nexau.archs.llm.llm_aggregators import Event
def handle_event(event: Event) -> None:
match event.type:
case "TEXT_MESSAGE_START":
pass # reset your UI buffer
case "TEXT_MESSAGE_CONTENT":
print(event.delta, end="", flush=True)
case "TEXT_MESSAGE_END":
pass # flush the message
case "TOOL_CALL_START":
print(f"\n[calling {event.name}]")
case "TOOL_CALL_ARGS":
pass # event.tool_call_id, event.delta
case "RUN_FINISHED":
print(f"\nDone: {event.result}")
case "RUN_ERROR":
print(f"Error: {event.message}")
case _:
pass
Always handle RUN_ERROR and TRANSPORT_ERROR — they’re how you surface failures to users and logs.
Using events with HTTP/SSE transport
When your agent is served over HTTP, use SSEClient.stream_events() to consume the event stream:
from nexau.archs.transports.http.sse_client import SSEClient
client = SSEClient("http://localhost:8000")
async for event in client.stream_events(
message="Summarize the latest report",
user_id="user_123",
session_id="sess_456",
):
if event["type"] == "event":
ev = event["event"]
if ev.get("type") == "TEXT_MESSAGE_CONTENT":
print(ev.get("delta", ""), end="", flush=True)
elif event["type"] == "complete":
print(f"\n\n{event.get('response')}")
Each yielded item has a type key. "event" items carry the streaming event payload; "complete" signals the run is done and includes the final response.