Agent Team coordinates a leader agent and multiple teammate agents working in parallel to complete complex tasks.
Agent Team is experimental. APIs may change between releases.
Agent Team is NexAU’s model for parallel task coordination. A leader agent analyzes a user request, spawns teammate agents from pre-configured role templates, creates tasks on a shared task board, and assigns them to teammates. Teammates work independently and communicate through a message bus. All agents stream their output through a single SSE connection, with each event tagged by the originating agent.
Create YAML configs for the leader and each candidate role. The leader receives team coordination tools automatically — you only need to define its domain tools.
curl -X POST http://localhost:8000/team/stream \ -H "Content-Type: application/json" \ -d '{"user_id": "u1", "session_id": "s1", "message": "Build a TODO app"}'
The response is an SSE stream. Each event is a TeamStreamEnvelope tagged with the originating agent:
data: {"team_id":"team_abc","agent_id":"leader-001","role_name":"leader","event":{"type":"TEXT_MESSAGE_CONTENT","delta":"Let me plan this..."}}data: {"team_id":"team_abc","agent_id":"builder-1","role_name":"builder","event":{"type":"TEXT_MESSAGE_CONTENT","delta":"Writing the API..."}}
Teammates receive a subset: message, broadcast, list_teammates, list_tasks, claim_task, update_task_status, and release_task. Teammates cannot spawn other teammates, create tasks, or end the team run.
You can use AgentTeam directly without starting an HTTP server — useful for scripts, notebooks, or embedding team coordination in your own application.
run() blocks until the leader calls finish_team, then returns the leader’s final response as a string. You don’t need to call initialize() separately.
async def main() -> None: # ... same setup as above ... async for envelope in team.run_streaming(message="Build a TODO app"): print(f"[{envelope.role_name}:{envelope.agent_id}] {envelope.event}")asyncio.run(main())
run_streaming() returns an async generator of TeamStreamEnvelope objects — the same format used by the HTTP SSE endpoint. You can also pass an on_envelope callback for side effects like persistence:
envelopes: list[TeamStreamEnvelope] = []async for envelope in team.run_streaming( message="Build a TODO app", on_envelope=envelopes.append,): pass # or process each envelope in real time# envelopes now contains the full event history
Both run() and run_streaming() accept a variables argument. Pass a ContextValue to inject runtime variables:
from nexau.archs.main_sub.context_value import ContextValuevariables = ContextValue( template={"date": "2026-03-04", "username": "alice"}, runtime_vars={"api_key": "sk-..."}, sandbox_env={"WORKSPACE": "/tmp/project"},)result = asyncio.run(team.run(message="Build a TODO app", variables=variables))
By default, AgentTeam uses InMemoryDatabaseEngine, which does not survive process restarts. For production or long-running workflows, swap it for SQLDatabaseEngine:
from nexau.archs.session.orm import SQLDatabaseEngineengine = SQLDatabaseEngine.from_url("sqlite+aiosqlite:///team.db")
Team state — tasks, messages, and team membership — is persisted and automatically restored on the next run() call with the same user_id and session_id.