Transports are experimental. APIs may change in future releases.
Transports let you serve your agent over a network or process boundary. You can run an HTTP server with Server-Sent Events for real-time streaming, use stdio for CLI pipelines and subprocess integration, or start an interactive chat session directly in your terminal.
HTTP + SSE
Start the server
uv run nexau serve http --config agent.yaml --port 8000
Or start it programmatically:
from nexau.archs.transports.http import SSETransportServer, HTTPConfig
from nexau.archs.session.orm import SQLDatabaseEngine
engine = SQLDatabaseEngine.from_url( "sqlite+aiosqlite:///sessions.db" )
await engine.setup_models()
server = SSETransportServer(
engine = engine,
default_agent_config = agent_config,
config = HTTPConfig(
host = "0.0.0.0" ,
port = 8000 ,
cors_origins = [ "*" ],
log_level = "info" ,
),
)
server.start()
Endpoints
Method Path Description POST/querySynchronous query — waits for a complete response GET/streamStreaming query — returns Server-Sent Events POST/stopStop a running agent and persist its current state GET/healthHealth check
curl examples
Synchronous query
Streaming query (SSE)
Stop a running agent
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{
"message": "What is AI?",
"user_id": "user123",
"session_id": "sess456"
}'
The streaming endpoint emits SSE events like:
data: {"type": "event", "event": {"type": "TEXT_MESSAGE_CONTENT", "delta": "Hello"}}
data: {"type": "event", "event": {"type": "TEXT_MESSAGE_CONTENT", "delta": " there"}}
data: {"type": "complete", "response": "Hello there", "session_id": "sess456"}
Python client
Use the built-in SSEClient to query or stream from Python:
from nexau.archs.transports.http.sse_client import SSEClient
client = SSEClient( "http://localhost:8000" )
# Synchronous
response = await client.query(
message = "What is AI?" ,
user_id = "user_123" ,
session_id = "sess_456" ,
)
# Streaming
async for event in client.stream_events(
message = "What is AI?" ,
user_id = "user_123" ,
session_id = "sess_456" ,
):
if event[ "type" ] == "event" :
print (event[ "event" ].get( "delta" , "" ), end = "" , flush = True )
elif event[ "type" ] == "complete" :
print ( f " \n Session: { event.get( 'session_id' ) } " )
JavaScript streaming
async function streamResponse ( message : string ) {
const url = `http://localhost:8000/stream?message= ${ encodeURIComponent ( message ) } &user_id=user123&session_id=sess456` ;
const response = await fetch ( url );
const reader = response . body ?. getReader ();
const decoder = new TextDecoder ();
while ( true ) {
const { done , value } = await reader ! . read ();
if ( done ) break ;
const chunk = decoder . decode ( value );
for ( const line of chunk . split ( " \n " )) {
if ( line . startsWith ( "data: " )) {
const data = JSON . parse ( line . slice ( 6 ));
if ( data . type === "event" && data . event . type === "TEXT_MESSAGE_CONTENT" ) {
setResponse ( prev => prev + data . event . delta );
}
}
}
}
}
HTTPConfig fields
Address to bind the server to.
Port number to listen on.
cors_origins
list[str]
default: "[\"*\"]"
List of allowed CORS origins. Use specific origins in production.
Uvicorn log level. One of debug, info, warning, error, critical.
Stdio
The stdio transport reads JSON-RPC 2.0 requests from stdin and writes responses to stdout. Use it for CLI pipelines or subprocess integration.
Start the server
uv run nexau serve stdio --config agent.yaml
JSON-RPC protocol
Request
Streaming response
Stop request
Stop response
{
"jsonrpc" : "2.0" ,
"method" : "run" ,
"params" : {
"message" : "Hello" ,
"user_id" : "user_123" ,
"session_id" : "sess_456" ,
"context" : {}
},
"id" : 1
}
Subprocess integration
import subprocess
import json
process = subprocess.Popen(
[ "uv" , "run" , "nexau" , "serve" , "stdio" , "--config" , "agent.yaml" ],
stdin = subprocess. PIPE ,
stdout = subprocess. PIPE ,
text = True ,
)
request = {
"jsonrpc" : "2.0" ,
"method" : "run" ,
"params" : {
"message" : "Hello" ,
"user_id" : "user_123" ,
"session_id" : "sess_456" ,
},
"id" : 1 ,
}
process.stdin.write(json.dumps(request) + " \n " )
process.stdin.flush()
for line in process.stdout:
response = json.loads(line)
if response[ "result" ][ "type" ] == "event" :
print (response[ "result" ][ "event" ].get( "delta" , "" ), end = "" )
elif response[ "result" ][ "type" ] == "complete" :
print ( f " \n Done: { response[ 'result' ][ 'response' ] } " )
break
CLI chat
Start an interactive terminal session with your agent:
uv run nexau chat --config agent.yaml
For scripted or non-interactive use:
uv run nexau chat \
--config agent.yaml \
--query "Hello" \
--user-id "user123" \
--session-id "sess456"
Common errors
OSError: Address already in use
Another process is already using port 8000. Start the server on a different port: uv run nexau serve http --config agent.yaml --port 8080
The server’s cors_origins list doesn’t include your frontend’s origin. Update your HTTPConfig: config = HTTPConfig(
cors_origins = [ "http://localhost:3000" , "https://yourdomain.com" ],
)
Streaming not working — no events received
Make sure you’re calling the /stream endpoint, not /query, and that curl’s -N flag is set to disable buffering: # Wrong — synchronous endpoint
curl http://localhost:8000/query?message=Hello
# Correct — streaming endpoint
curl -N "http://localhost:8000/stream?message=Hello"