Skip to main content
NexAU supports both sync and async execution. The rule is simple: use the sync API in scripts with no running event loop, and the async API everywhere else.

Choosing an API

ContextCreate agentRun agent
CLI / script (no event loop)Agent(config=...)agent.run(message=...)
Async handler (FastAPI, Transport, AgentTeam)await Agent.create(config=...)await agent.run_async(message=...)

Sync path

Use this in CLI scripts and __main__ entry points:
from nexau import Agent, AgentConfig, LLMConfig

config = AgentConfig(
    name="my_agent",
    llm_config=LLMConfig(model="gpt-4o"),
    system_prompt="You are a helpful assistant.",
)

agent = Agent(config=config)
response = agent.run(message="Hello!")
print(response)
agent.run() internally calls asyncio.run(self.run_async(...)). If a running event loop is already present on the current thread, it raises RuntimeError — use await agent.run_async() instead.

Async path

Use this in async handlers, transports, and anywhere a running event loop exists:
from nexau import Agent, AgentConfig, LLMConfig

config = AgentConfig(
    name="my_agent",
    llm_config=LLMConfig(model="gpt-4o"),
    system_prompt="You are a helpful assistant.",
)

agent = await Agent.create(config=config, session_manager=sm)
response = await agent.run_async(message="Hello!")
Use Agent.create() instead of Agent() in async contexts. Agent initialization involves async I/O (database writes, MCP discovery, storage restoration). Agent() cannot await, so it may attempt to create a nested event loop — await Agent.create() runs all initialization natively on the current loop.

Writing tools

Define a regular function. The executor automatically dispatches it to a thread pool via asyncio.to_thread() so it never blocks the event loop:
from nexau import Tool

def my_search(query: str) -> dict:
    import requests
    result = requests.get(f"https://api.example.com/search?q={query}")
    return {"results": result.json()}

tool = Tool(
    name="my_search",
    description="Search the web",
    input_schema={"type": "object", "properties": {"query": {"type": "string"}}},
    implementation=my_search,
)

Async tools

For native async I/O (WebSocket, streaming HTTP), define an async def implementation. The executor awaits it directly — no thread pool:
import httpx

async def my_async_search(query: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.get(f"https://api.example.com/search?q={query}")
        return {"results": resp.json()}

How the executor dispatches tools

Tool typeDispatch strategy
Sync implementationawait asyncio.to_thread(tool.execute, ...)
Async implementationawait tool.execute_async(...) — directly awaited
has_native_async_execute = True (e.g. MCPTool)await tool.execute_async(...)
You don’t need to manage dispatch manually — define your implementation correctly and the executor handles the rest.

Common anti-patterns

# Wrong — asyncio.run() cannot be called inside a running loop
async def handle(request):
    agent = await Agent.create(config=config)
    response = agent.run(message="hello")  # RuntimeError!
# Correct
async def handle(request):
    agent = await Agent.create(config=config)
    response = await agent.run_async(message="hello")
# Wrong — sync __init__ may trigger nested loop
async def handle(request):
    agent = Agent(config=config)
# Correct
async def handle(request):
    agent = await Agent.create(config=config)
    response = await agent.run_async(message=request.message)
# Wrong — monkey-patches the running loop, incompatible with uvloop/TaskGroup
import nest_asyncio
nest_asyncio.apply()
asyncio.run(inner())
# Correct — await all the way in async contexts
async def outer():
    result = await inner()
# Wrong — prone to leaks
loop = asyncio.new_event_loop()
try:
    result = loop.run_until_complete(some_coro())
finally:
    loop.close()
# Correct — sync context
result = asyncio.run(some_coro())

# Correct — async context
result = await some_coro()

Middleware and async

Middleware hooks (before_model, after_tool, etc.) are a sync API. On the async execution path, the executor runs them via asyncio.to_thread() so they don’t block the main loop. Define middleware methods as regular functions — not async def:
from nexau.archs.main_sub.execution.hooks import Middleware, HookResult

class MyMiddleware(Middleware):
    def before_model(self, hook_input):
        # Sync — the framework bridges to async automatically
        return HookResult.no_changes()