Skip to main content
Streaming events are experimental. The API may change in future releases.
Streaming events are incremental, provider-agnostic notifications emitted during an agent run. They let you update UIs in real time, log progress, or drive streaming transports without depending on vendor-specific chunk formats. Events are emitted by the agent and transport layers — for example, AgentEventsMiddleware and the HTTP/SSE transport — throughout the run lifecycle.

Event types

All events share a type field. Use match event.type in Python to handle them.

Text message events

EventKey fieldsDescription
TEXT_MESSAGE_STARTrun_id, message_idStart of a text message
TEXT_MESSAGE_CONTENTmessage_id, deltaIncremental text fragment
TEXT_MESSAGE_ENDmessage_idEnd of the message

Thinking / reasoning events

Emitted by reasoning models such as o1 and DeepSeek-R1.
EventKey fieldsDescription
THINKING_TEXT_MESSAGE_STARTrun_id, parent_message_id, thinking_message_idStart of reasoning output
THINKING_TEXT_MESSAGE_CONTENTthinking_message_id, deltaIncremental reasoning fragment
THINKING_TEXT_MESSAGE_ENDthinking_message_idEnd of reasoning output

Tool call events

EventKey fieldsDescription
TOOL_CALL_STARTrun_id, tool_call_id, parent_message_id, nameStart of a tool call
TOOL_CALL_ARGStool_call_id, deltaIncremental argument JSON
TOOL_CALL_ENDtool_call_idEnd of argument streaming
TOOL_CALL_RESULTtool_call_id, content, roleResult after tool execution

Run lifecycle events

EventKey fieldsDescription
RUN_STARTEDagent_id, run_id, root_run_id, timestampRun started
RUN_FINISHEDrun_id, result, timestampRun completed successfully
RUN_ERRORrun_id, message, timestampRun failed
TRANSPORT_ERRORmessage, timestampTransport-level error

Handler pattern

Events are Pydantic models from nexau.archs.llm.llm_aggregators.events. Match on event.type:
from nexau.archs.llm.llm_aggregators import Event

def handle_event(event: Event) -> None:
    match event.type:
        case "TEXT_MESSAGE_START":
            pass  # reset your UI buffer
        case "TEXT_MESSAGE_CONTENT":
            print(event.delta, end="", flush=True)
        case "TEXT_MESSAGE_END":
            pass  # flush the message
        case "TOOL_CALL_START":
            print(f"\n[calling {event.name}]")
        case "TOOL_CALL_ARGS":
            pass  # event.tool_call_id, event.delta
        case "RUN_FINISHED":
            print(f"\nDone: {event.result}")
        case "RUN_ERROR":
            print(f"Error: {event.message}")
        case _:
            pass
Always handle RUN_ERROR and TRANSPORT_ERROR — they’re how you surface failures to users and logs.

Using events with HTTP/SSE transport

When your agent is served over HTTP, use SSEClient.stream_events() to consume the event stream:
from nexau.archs.transports.http.sse_client import SSEClient

client = SSEClient("http://localhost:8000")

async for event in client.stream_events(
    message="Summarize the latest report",
    user_id="user_123",
    session_id="sess_456",
):
    if event["type"] == "event":
        ev = event["event"]
        if ev.get("type") == "TEXT_MESSAGE_CONTENT":
            print(ev.get("delta", ""), end="", flush=True)
    elif event["type"] == "complete":
        print(f"\n\n{event.get('response')}")
Each yielded item has a type key. "event" items carry the streaming event payload; "complete" signals the run is done and includes the final response.