Server Integration

This guide covers practical integration patterns for common server scenarios. Each pattern demonstrates how to combine EventSink, InputSource, and PermissionPolicy for specific deployment environments. Use these as starting points and adapt them to your requirements.

The patterns range from simple single-user servers to multi-tenant deployments handling thousands of concurrent sessions. Choose the pattern that matches your scale and requirements, then customize the details.


WebSocket Server

WebSocket connections provide bidirectional communication, making them ideal for real-time chat interfaces. Each connection gets its own agent session with streaming responses.

Architecture

Browser  ←→  WebSocket Handler  ←→  Agent
   │              │                   │
   │ JSON msgs    │ InputSource       │
   │              │                   │
   │ JSON events  │ EventSink         │

Implementation

use axum::{
    extract::ws::{WebSocket, WebSocketUpgrade},
    routing::get,
    Router,
};
use agent_air::{AgentAir, ControllerInputPayload, UiMessage, TurnId};
use tokio::sync::mpsc;

async fn websocket_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    // Create channels for this connection
    let (input_tx, input_rx) = mpsc::channel(100);
    let (output_tx, mut output_rx) = mpsc::channel(500);

    // Create agent for this session
    let mut agent = AgentAir::with_config(
        "websocket-agent",
        CONFIG_PATH,
        SYSTEM_PROMPT
    ).expect("Failed to create agent");

    let sink = ChannelEventSink::new(output_tx);
    let source = ChannelInputSource::new(input_rx);
    let policy = AutoApprovePolicy::new();

    // Run agent in background
    let agent_handle = tokio::spawn(async move {
        agent.run_with_frontend(sink, source, policy).await
    });

    let session_id = 1;
    let mut turn_counter = 0u64;

    loop {
        tokio::select! {
            // Handle incoming WebSocket messages
            Some(msg) = socket.recv() => {
                match msg {
                    Ok(Message::Text(text)) => {
                        let client_msg: ClientMessage = match serde_json::from_str(&text) {
                            Ok(m) => m,
                            Err(_) => continue,
                        };

                        turn_counter += 1;
                        let payload = ControllerInputPayload::data(
                            session_id,
                            client_msg.content,
                            TurnId::new_user_turn(turn_counter)
                        );

                        if input_tx.send(payload).await.is_err() {
                            break;
                        }
                    }
                    Ok(Message::Close(_)) => break,
                    Err(_) => break,
                    _ => {}
                }
            }

            // Forward agent events to WebSocket
            Some(event) = output_rx.recv() => {
                let json = serde_json::to_string(&event).unwrap();
                if socket.send(Message::Text(json)).await.is_err() {
                    break;
                }
            }

            else => break,
        }
    }

    // Cleanup
    drop(input_tx);
    let _ = agent_handle.await;
}

#[tokio::main]
async fn main() {
    let app = Router::new()
        .route("/ws", get(websocket_handler));

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

Client-Side

const ws = new WebSocket('ws://localhost:3000/ws');

ws.onmessage = (event) => {
    const msg = JSON.parse(event.data);

    if (msg.TextChunk) {
        appendToChat(msg.TextChunk.text);
    } else if (msg.Complete) {
        markComplete();
    } else if (msg.Error) {
        showError(msg.Error.message);
    }
};

function sendMessage(text) {
    ws.send(JSON.stringify({ content: text }));
}

HTTP Streaming (Server-Sent Events)

SSE provides server-to-client streaming over HTTP, with a separate endpoint for user input. This works well when you need HTTP compatibility without WebSocket infrastructure.

Architecture

Browser  →  POST /chat        →  Agent Input
Browser  ←  GET /events (SSE) ←  Agent Events

Implementation

use axum::{
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
    Json, Router,
};
use futures_util::stream::Stream;
use std::sync::Arc;
use tokio::sync::broadcast;

struct AppState {
    input_tx: mpsc::Sender<ControllerInputPayload>,
    event_tx: broadcast::Sender<UiMessage>,
}

async fn chat_handler(
    State(state): State<Arc<AppState>>,
    Json(request): Json<ChatRequest>,
) -> impl IntoResponse {
    let payload = ControllerInputPayload::data(
        request.session_id,
        request.message,
        TurnId::new_user_turn(request.turn_number)
    );

    match state.input_tx.send(payload).await {
        Ok(_) => Json(json!({"status": "ok"})),
        Err(_) => Json(json!({"status": "error", "message": "Agent unavailable"})),
    }
}

async fn events_handler(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.event_tx.subscribe();

    let stream = async_stream::stream! {
        while let Ok(event) = rx.recv().await {
            let event_type = match &event {
                UiMessage::TextChunk { .. } => "text",
                UiMessage::ToolExecuting { .. } => "tool_start",
                UiMessage::ToolCompleted { .. } => "tool_end",
                UiMessage::Complete { .. } => "complete",
                UiMessage::Error { .. } => "error",
                _ => "event",
            };

            let data = serde_json::to_string(&event).unwrap();
            yield Ok(Event::default().event(event_type).data(data));
        }
    };

    Sse::new(stream)
}

#[tokio::main]
async fn main() {
    let (input_tx, input_rx) = mpsc::channel(100);
    let (event_tx, _) = broadcast::channel(500);

    let state = Arc::new(AppState {
        input_tx,
        event_tx: event_tx.clone(),
    });

    // Create and run agent
    let mut agent = AgentAir::with_config("sse-agent", CONFIG_PATH, SYSTEM_PROMPT)
        .expect("Failed to create agent");

    let sink = BroadcastEventSink::new(event_tx);
    let source = ChannelInputSource::new(input_rx);
    let policy = AutoApprovePolicy::new();

    tokio::spawn(async move {
        agent.run_with_frontend(sink, source, policy).await
    });

    let app = Router::new()
        .route("/chat", post(chat_handler))
        .route("/events", get(events_handler))
        .with_state(state);

    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

Client-Side

const eventSource = new EventSource('/events');

eventSource.addEventListener('text', (e) => {
    const data = JSON.parse(e.data);
    appendToChat(data.text);
});

eventSource.addEventListener('complete', (e) => {
    markComplete();
});

eventSource.addEventListener('error', (e) => {
    const data = JSON.parse(e.data);
    showError(data.message);
});

async function sendMessage(text, sessionId, turnNumber) {
    await fetch('/chat', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
            session_id: sessionId,
            message: text,
            turn_number: turnNumber,
        }),
    });
}

Background Job Processor

Process agent tasks from a job queue without real-time user interaction. Useful for batch processing, scheduled tasks, and async workflows.

Architecture

Job Queue  →  Worker  →  Agent  →  Result Store
   ↑                                    │
   └────────── Retry on failure ────────┘

Implementation

use agent_air::{AgentAir, ControllerInputPayload, UiMessage, TurnId};

struct JobProcessor {
    queue: QueueClient,
    results: ResultStore,
}

impl JobProcessor {
    async fn run(&self) -> Result<(), Error> {
        loop {
            // Fetch next job
            let job = match self.queue.receive().await? {
                Some(j) => j,
                None => {
                    tokio::time::sleep(Duration::from_secs(1)).await;
                    continue;
                }
            };

            // Process the job
            match self.process_job(&job).await {
                Ok(result) => {
                    self.results.store(&job.id, &result).await?;
                    self.queue.acknowledge(&job.receipt).await?;
                }
                Err(e) => {
                    log::error!("Job {} failed: {}", job.id, e);
                    // Job will be retried after visibility timeout
                }
            }
        }
    }

    async fn process_job(&self, job: &Job) -> Result<String, Error> {
        // Create a fresh agent for this job
        let mut agent = AgentAir::with_config(
            "job-processor",
            &job.config_path,
            &job.system_prompt
        )?;

        // Collect output
        let (output_tx, mut output_rx) = mpsc::channel(500);
        let sink = ChannelEventSink::new(output_tx);

        // Single-message source
        let (input_tx, input_rx) = mpsc::channel(1);
        let source = ChannelInputSource::new(input_rx);

        // Auto-approve for background jobs
        let policy = AutoApprovePolicy::new();

        // Start agent
        let agent_handle = tokio::spawn(async move {
            agent.run_with_frontend(sink, source, policy).await
        });

        // Send the job's prompt
        input_tx.send(ControllerInputPayload::data(
            1,
            &job.prompt,
            TurnId::new_user_turn(1)
        )).await?;
        drop(input_tx);  // Signal completion

        // Collect response
        let mut response = String::new();
        while let Some(event) = output_rx.recv().await {
            if let UiMessage::TextChunk { text, .. } = event {
                response.push_str(&text);
            }
        }

        agent_handle.await??;

        Ok(response)
    }
}

Multi-Tenant Server

Handle multiple users with separate sessions, routing messages and events by user/session ID.

Architecture

Users  →  API Gateway  →  Session Router  →  Agent Sessions

                          Session Store

Implementation

use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

struct SessionManager {
    sessions: Arc<RwLock<HashMap<String, SessionHandle>>>,
    config_path: String,
    system_prompt: String,
}

struct SessionHandle {
    input_tx: mpsc::Sender<ControllerInputPayload>,
    session_id: i64,
    created_at: Instant,
}

impl SessionManager {
    async fn get_or_create_session(&self, user_id: &str) -> Result<SessionHandle, Error> {
        // Check for existing session
        {
            let sessions = self.sessions.read().await;
            if let Some(handle) = sessions.get(user_id) {
                return Ok(handle.clone());
            }
        }

        // Create new session
        let (input_tx, input_rx) = mpsc::channel(100);
        let (output_tx, output_rx) = mpsc::channel(500);

        let session_id = self.generate_session_id();

        let mut agent = AgentAir::with_config(
            &format!("user-{}", user_id),
            &self.config_path,
            &self.system_prompt
        )?;

        let sink = ChannelEventSink::new(output_tx);
        let source = ChannelInputSource::new(input_rx);
        let policy = AutoApprovePolicy::new();

        // Run agent in background
        let user_id_clone = user_id.to_string();
        let sessions = self.sessions.clone();
        tokio::spawn(async move {
            let result = agent.run_with_frontend(sink, source, policy).await;

            // Cleanup on exit
            sessions.write().await.remove(&user_id_clone);

            if let Err(e) = result {
                log::error!("Session {} error: {}", user_id_clone, e);
            }
        });

        // Store event receiver somewhere accessible
        self.store_event_receiver(user_id, output_rx).await;

        let handle = SessionHandle {
            input_tx,
            session_id,
            created_at: Instant::now(),
        };

        self.sessions.write().await.insert(user_id.to_string(), handle.clone());

        Ok(handle)
    }

    async fn send_message(&self, user_id: &str, message: &str, turn: u64) -> Result<(), Error> {
        let handle = self.get_or_create_session(user_id).await?;

        let payload = ControllerInputPayload::data(
            handle.session_id,
            message,
            TurnId::new_user_turn(turn)
        );

        handle.input_tx.send(payload).await
            .map_err(|_| Error::SessionClosed)?;

        Ok(())
    }

    async fn cleanup_idle_sessions(&self, max_idle: Duration) {
        let mut sessions = self.sessions.write().await;
        let now = Instant::now();

        sessions.retain(|user_id, handle| {
            if now.duration_since(handle.created_at) > max_idle {
                log::info!("Cleaning up idle session for {}", user_id);
                false
            } else {
                true
            }
        });
    }
}

Slack/Discord Bot

Integrate with chat platforms by translating their message formats to agent inputs and responses.

Architecture

Chat Platform API  ←→  Bot Handler  ←→  Agent
       │                    │
   Webhooks/Events    Session per channel

Implementation Pattern

struct SlackBot {
    agent_sessions: HashMap<String, SessionHandle>,  // channel_id -> session
    slack_client: SlackClient,
}

impl SlackBot {
    async fn handle_message(&mut self, event: SlackMessage) {
        let channel_id = &event.channel;

        // Get or create session for this channel
        let session = self.get_or_create_session(channel_id).await;

        // Send message to agent
        let payload = ControllerInputPayload::data(
            session.session_id,
            &event.text,
            TurnId::new_user_turn(session.next_turn())
        );
        session.input_tx.send(payload).await.unwrap();
    }

    async fn run_event_forwarder(&self, channel_id: String, mut rx: mpsc::Receiver<UiMessage>) {
        let mut response_buffer = String::new();

        while let Some(event) = rx.recv().await {
            match event {
                UiMessage::TextChunk { text, .. } => {
                    response_buffer.push_str(&text);
                }
                UiMessage::Complete { .. } => {
                    // Send accumulated response to Slack
                    if !response_buffer.is_empty() {
                        self.slack_client.post_message(&channel_id, &response_buffer).await;
                        response_buffer.clear();
                    }
                }
                UiMessage::ToolExecuting { tool_name, .. } => {
                    // Optionally show typing indicator or status
                    self.slack_client.show_typing(&channel_id).await;
                }
                UiMessage::Error { message, .. } => {
                    self.slack_client.post_message(&channel_id, &format!("Error: {}", message)).await;
                }
                _ => {}
            }
        }
    }
}

Testing Integration

Test your integration with mock agents and controlled inputs.

#[tokio::test]
async fn test_websocket_integration() {
    // Start server
    let server = start_test_server().await;

    // Connect WebSocket client
    let (mut ws, _) = connect_async(&server.ws_url()).await.unwrap();

    // Send a message
    ws.send(Message::Text(r#"{"content":"Hello"}"#.to_string())).await.unwrap();

    // Collect response
    let mut response = String::new();
    while let Some(Ok(msg)) = ws.next().await {
        if let Message::Text(text) = msg {
            let event: UiMessage = serde_json::from_str(&text).unwrap();
            match event {
                UiMessage::TextChunk { text, .. } => response.push_str(&text),
                UiMessage::Complete { .. } => break,
                _ => {}
            }
        }
    }

    assert!(!response.is_empty());
}

Mock the LLM for deterministic testing, or use a test configuration that limits capabilities for faster iteration.