Server Integration
This guide covers practical integration patterns for common server scenarios. Each pattern demonstrates how to combine EventSink, InputSource, and PermissionPolicy for specific deployment environments. Use these as starting points and adapt them to your requirements.
The patterns range from simple single-user servers to multi-tenant deployments handling thousands of concurrent sessions. Choose the pattern that matches your scale and requirements, then customize the details.
WebSocket Server
WebSocket connections provide bidirectional communication, making them ideal for real-time chat interfaces. Each connection gets its own agent session with streaming responses.
Architecture
Browser ←→ WebSocket Handler ←→ Agent
│ │ │
│ JSON msgs │ InputSource │
│ │ │
│ JSON events │ EventSink │
Implementation
use axum::{
extract::ws::{WebSocket, WebSocketUpgrade},
routing::get,
Router,
};
use agent_air::{AgentAir, ControllerInputPayload, UiMessage, TurnId};
use tokio::sync::mpsc;
async fn websocket_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
// Create channels for this connection
let (input_tx, input_rx) = mpsc::channel(100);
let (output_tx, mut output_rx) = mpsc::channel(500);
// Create agent for this session
let mut agent = AgentAir::with_config(
"websocket-agent",
CONFIG_PATH,
SYSTEM_PROMPT
).expect("Failed to create agent");
let sink = ChannelEventSink::new(output_tx);
let source = ChannelInputSource::new(input_rx);
let policy = AutoApprovePolicy::new();
// Run agent in background
let agent_handle = tokio::spawn(async move {
agent.run_with_frontend(sink, source, policy).await
});
let session_id = 1;
let mut turn_counter = 0u64;
loop {
tokio::select! {
// Handle incoming WebSocket messages
Some(msg) = socket.recv() => {
match msg {
Ok(Message::Text(text)) => {
let client_msg: ClientMessage = match serde_json::from_str(&text) {
Ok(m) => m,
Err(_) => continue,
};
turn_counter += 1;
let payload = ControllerInputPayload::data(
session_id,
client_msg.content,
TurnId::new_user_turn(turn_counter)
);
if input_tx.send(payload).await.is_err() {
break;
}
}
Ok(Message::Close(_)) => break,
Err(_) => break,
_ => {}
}
}
// Forward agent events to WebSocket
Some(event) = output_rx.recv() => {
let json = serde_json::to_string(&event).unwrap();
if socket.send(Message::Text(json)).await.is_err() {
break;
}
}
else => break,
}
}
// Cleanup
drop(input_tx);
let _ = agent_handle.await;
}
#[tokio::main]
async fn main() {
let app = Router::new()
.route("/ws", get(websocket_handler));
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Client-Side
const ws = new WebSocket('ws://localhost:3000/ws');
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (msg.TextChunk) {
appendToChat(msg.TextChunk.text);
} else if (msg.Complete) {
markComplete();
} else if (msg.Error) {
showError(msg.Error.message);
}
};
function sendMessage(text) {
ws.send(JSON.stringify({ content: text }));
}
HTTP Streaming (Server-Sent Events)
SSE provides server-to-client streaming over HTTP, with a separate endpoint for user input. This works well when you need HTTP compatibility without WebSocket infrastructure.
Architecture
Browser → POST /chat → Agent Input
Browser ← GET /events (SSE) ← Agent Events
Implementation
use axum::{
extract::State,
response::sse::{Event, Sse},
routing::{get, post},
Json, Router,
};
use futures_util::stream::Stream;
use std::sync::Arc;
use tokio::sync::broadcast;
struct AppState {
input_tx: mpsc::Sender<ControllerInputPayload>,
event_tx: broadcast::Sender<UiMessage>,
}
async fn chat_handler(
State(state): State<Arc<AppState>>,
Json(request): Json<ChatRequest>,
) -> impl IntoResponse {
let payload = ControllerInputPayload::data(
request.session_id,
request.message,
TurnId::new_user_turn(request.turn_number)
);
match state.input_tx.send(payload).await {
Ok(_) => Json(json!({"status": "ok"})),
Err(_) => Json(json!({"status": "error", "message": "Agent unavailable"})),
}
}
async fn events_handler(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.event_tx.subscribe();
let stream = async_stream::stream! {
while let Ok(event) = rx.recv().await {
let event_type = match &event {
UiMessage::TextChunk { .. } => "text",
UiMessage::ToolExecuting { .. } => "tool_start",
UiMessage::ToolCompleted { .. } => "tool_end",
UiMessage::Complete { .. } => "complete",
UiMessage::Error { .. } => "error",
_ => "event",
};
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default().event(event_type).data(data));
}
};
Sse::new(stream)
}
#[tokio::main]
async fn main() {
let (input_tx, input_rx) = mpsc::channel(100);
let (event_tx, _) = broadcast::channel(500);
let state = Arc::new(AppState {
input_tx,
event_tx: event_tx.clone(),
});
// Create and run agent
let mut agent = AgentAir::with_config("sse-agent", CONFIG_PATH, SYSTEM_PROMPT)
.expect("Failed to create agent");
let sink = BroadcastEventSink::new(event_tx);
let source = ChannelInputSource::new(input_rx);
let policy = AutoApprovePolicy::new();
tokio::spawn(async move {
agent.run_with_frontend(sink, source, policy).await
});
let app = Router::new()
.route("/chat", post(chat_handler))
.route("/events", get(events_handler))
.with_state(state);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
Client-Side
const eventSource = new EventSource('/events');
eventSource.addEventListener('text', (e) => {
const data = JSON.parse(e.data);
appendToChat(data.text);
});
eventSource.addEventListener('complete', (e) => {
markComplete();
});
eventSource.addEventListener('error', (e) => {
const data = JSON.parse(e.data);
showError(data.message);
});
async function sendMessage(text, sessionId, turnNumber) {
await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
session_id: sessionId,
message: text,
turn_number: turnNumber,
}),
});
}
Background Job Processor
Process agent tasks from a job queue without real-time user interaction. Useful for batch processing, scheduled tasks, and async workflows.
Architecture
Job Queue → Worker → Agent → Result Store
↑ │
└────────── Retry on failure ────────┘
Implementation
use agent_air::{AgentAir, ControllerInputPayload, UiMessage, TurnId};
struct JobProcessor {
queue: QueueClient,
results: ResultStore,
}
impl JobProcessor {
async fn run(&self) -> Result<(), Error> {
loop {
// Fetch next job
let job = match self.queue.receive().await? {
Some(j) => j,
None => {
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
// Process the job
match self.process_job(&job).await {
Ok(result) => {
self.results.store(&job.id, &result).await?;
self.queue.acknowledge(&job.receipt).await?;
}
Err(e) => {
log::error!("Job {} failed: {}", job.id, e);
// Job will be retried after visibility timeout
}
}
}
}
async fn process_job(&self, job: &Job) -> Result<String, Error> {
// Create a fresh agent for this job
let mut agent = AgentAir::with_config(
"job-processor",
&job.config_path,
&job.system_prompt
)?;
// Collect output
let (output_tx, mut output_rx) = mpsc::channel(500);
let sink = ChannelEventSink::new(output_tx);
// Single-message source
let (input_tx, input_rx) = mpsc::channel(1);
let source = ChannelInputSource::new(input_rx);
// Auto-approve for background jobs
let policy = AutoApprovePolicy::new();
// Start agent
let agent_handle = tokio::spawn(async move {
agent.run_with_frontend(sink, source, policy).await
});
// Send the job's prompt
input_tx.send(ControllerInputPayload::data(
1,
&job.prompt,
TurnId::new_user_turn(1)
)).await?;
drop(input_tx); // Signal completion
// Collect response
let mut response = String::new();
while let Some(event) = output_rx.recv().await {
if let UiMessage::TextChunk { text, .. } = event {
response.push_str(&text);
}
}
agent_handle.await??;
Ok(response)
}
}
Multi-Tenant Server
Handle multiple users with separate sessions, routing messages and events by user/session ID.
Architecture
Users → API Gateway → Session Router → Agent Sessions
│
Session Store
Implementation
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
struct SessionManager {
sessions: Arc<RwLock<HashMap<String, SessionHandle>>>,
config_path: String,
system_prompt: String,
}
struct SessionHandle {
input_tx: mpsc::Sender<ControllerInputPayload>,
session_id: i64,
created_at: Instant,
}
impl SessionManager {
async fn get_or_create_session(&self, user_id: &str) -> Result<SessionHandle, Error> {
// Check for existing session
{
let sessions = self.sessions.read().await;
if let Some(handle) = sessions.get(user_id) {
return Ok(handle.clone());
}
}
// Create new session
let (input_tx, input_rx) = mpsc::channel(100);
let (output_tx, output_rx) = mpsc::channel(500);
let session_id = self.generate_session_id();
let mut agent = AgentAir::with_config(
&format!("user-{}", user_id),
&self.config_path,
&self.system_prompt
)?;
let sink = ChannelEventSink::new(output_tx);
let source = ChannelInputSource::new(input_rx);
let policy = AutoApprovePolicy::new();
// Run agent in background
let user_id_clone = user_id.to_string();
let sessions = self.sessions.clone();
tokio::spawn(async move {
let result = agent.run_with_frontend(sink, source, policy).await;
// Cleanup on exit
sessions.write().await.remove(&user_id_clone);
if let Err(e) = result {
log::error!("Session {} error: {}", user_id_clone, e);
}
});
// Store event receiver somewhere accessible
self.store_event_receiver(user_id, output_rx).await;
let handle = SessionHandle {
input_tx,
session_id,
created_at: Instant::now(),
};
self.sessions.write().await.insert(user_id.to_string(), handle.clone());
Ok(handle)
}
async fn send_message(&self, user_id: &str, message: &str, turn: u64) -> Result<(), Error> {
let handle = self.get_or_create_session(user_id).await?;
let payload = ControllerInputPayload::data(
handle.session_id,
message,
TurnId::new_user_turn(turn)
);
handle.input_tx.send(payload).await
.map_err(|_| Error::SessionClosed)?;
Ok(())
}
async fn cleanup_idle_sessions(&self, max_idle: Duration) {
let mut sessions = self.sessions.write().await;
let now = Instant::now();
sessions.retain(|user_id, handle| {
if now.duration_since(handle.created_at) > max_idle {
log::info!("Cleaning up idle session for {}", user_id);
false
} else {
true
}
});
}
}
Slack/Discord Bot
Integrate with chat platforms by translating their message formats to agent inputs and responses.
Architecture
Chat Platform API ←→ Bot Handler ←→ Agent
│ │
Webhooks/Events Session per channel
Implementation Pattern
struct SlackBot {
agent_sessions: HashMap<String, SessionHandle>, // channel_id -> session
slack_client: SlackClient,
}
impl SlackBot {
async fn handle_message(&mut self, event: SlackMessage) {
let channel_id = &event.channel;
// Get or create session for this channel
let session = self.get_or_create_session(channel_id).await;
// Send message to agent
let payload = ControllerInputPayload::data(
session.session_id,
&event.text,
TurnId::new_user_turn(session.next_turn())
);
session.input_tx.send(payload).await.unwrap();
}
async fn run_event_forwarder(&self, channel_id: String, mut rx: mpsc::Receiver<UiMessage>) {
let mut response_buffer = String::new();
while let Some(event) = rx.recv().await {
match event {
UiMessage::TextChunk { text, .. } => {
response_buffer.push_str(&text);
}
UiMessage::Complete { .. } => {
// Send accumulated response to Slack
if !response_buffer.is_empty() {
self.slack_client.post_message(&channel_id, &response_buffer).await;
response_buffer.clear();
}
}
UiMessage::ToolExecuting { tool_name, .. } => {
// Optionally show typing indicator or status
self.slack_client.show_typing(&channel_id).await;
}
UiMessage::Error { message, .. } => {
self.slack_client.post_message(&channel_id, &format!("Error: {}", message)).await;
}
_ => {}
}
}
}
}
Testing Integration
Test your integration with mock agents and controlled inputs.
#[tokio::test]
async fn test_websocket_integration() {
// Start server
let server = start_test_server().await;
// Connect WebSocket client
let (mut ws, _) = connect_async(&server.ws_url()).await.unwrap();
// Send a message
ws.send(Message::Text(r#"{"content":"Hello"}"#.to_string())).await.unwrap();
// Collect response
let mut response = String::new();
while let Some(Ok(msg)) = ws.next().await {
if let Message::Text(text) = msg {
let event: UiMessage = serde_json::from_str(&text).unwrap();
match event {
UiMessage::TextChunk { text, .. } => response.push_str(&text),
UiMessage::Complete { .. } => break,
_ => {}
}
}
}
assert!(!response.is_empty());
}
Mock the LLM for deterministic testing, or use a test configuration that limits capabilities for faster iteration.
