Input Source
The InputSource trait defines how your application provides user input to the agent. Every message, command, and interaction flows through your source implementation. By implementing this trait, you control where input comes from—HTTP requests, WebSocket messages, message queues, or programmatic calls.
The source abstraction makes the agent input-agnostic. Whether messages arrive from a web form, a chat platform API, or a scheduled job, the agent processes them identically. This uniformity simplifies testing and enables the same agent to serve multiple input channels.
The InputSource Trait
The trait has a single method that returns the next input payload asynchronously. Returning None signals that the source is closed and the agent should shut down.
pub trait InputSource: Send {
/// Receive the next input payload, or None if the source is closed
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>>;
}
The method is async to support sources that need to wait for input—network requests, queue polling, or user interaction. The agent calls recv() repeatedly until it returns None.
ControllerInputPayload
Every input to the agent is wrapped in a ControllerInputPayload struct that includes the message content along with routing and sequencing information.
pub struct ControllerInputPayload {
/// Which session this message belongs to
pub session_id: i64,
/// The user's message content
pub content: String,
/// Sequence identifier for this turn
pub turn_id: TurnId,
/// Optional metadata
pub metadata: Option<HashMap<String, Value>>,
}
Creating Payloads
Use the constructor methods to create payloads:
// Standard user message
let payload = ControllerInputPayload::data(
session_id,
"What is the weather today?",
TurnId::new_user_turn(1)
);
// With metadata
let mut metadata = HashMap::new();
metadata.insert("user_id".to_string(), json!("user123"));
let payload = ControllerInputPayload {
session_id,
content: "Hello".to_string(),
turn_id: TurnId::new_user_turn(2),
metadata: Some(metadata),
};
Turn IDs
Turn IDs sequence the conversation and help track which response corresponds to which input. Increment the turn number for each user message in a session.
// First user message
TurnId::new_user_turn(1)
// Second user message
TurnId::new_user_turn(2)
// Assistant responses use their own sequence
TurnId::new_assistant_turn(1)
Built-in Implementation
Agent Air provides a channel-based input source for simple integrations.
ChannelInputSource
Wraps an async channel receiver, accepting input from a corresponding sender. This works well when input arrives from multiple sources that you want to multiplex into a single stream.
use agent_air::source::ChannelInputSource;
use tokio::sync::mpsc;
let (tx, rx) = mpsc::channel(100);
let source = ChannelInputSource::new(rx);
// Send input from anywhere
tx.send(ControllerInputPayload::data(
session_id,
"User message",
TurnId::new_user_turn(1)
)).await?;
// Close by dropping the sender
drop(tx); // Source will return None on next recv()
The channel naturally handles concurrency—multiple tasks can send to the same channel, and messages arrive in order.
Implementing Custom Sources
Create custom sources by implementing the InputSource trait. Your implementation should handle whatever input mechanism your application uses.
HTTP Request Source
A source that receives input from an HTTP endpoint, one request at a time:
use agent_air::{InputSource, ControllerInputPayload, TurnId};
use tokio::sync::mpsc;
use std::pin::Pin;
use std::future::Future;
pub struct HttpRequestSource {
rx: mpsc::Receiver<HttpRequest>,
turn_counter: u64,
}
impl HttpRequestSource {
pub fn new(rx: mpsc::Receiver<HttpRequest>) -> Self {
Self { rx, turn_counter: 0 }
}
}
impl InputSource for HttpRequestSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
let request = self.rx.recv().await?;
self.turn_counter += 1;
Some(ControllerInputPayload {
session_id: request.session_id,
content: request.message,
turn_id: TurnId::new_user_turn(self.turn_counter),
metadata: request.metadata,
})
})
}
}
Your HTTP handler sends requests to the channel, and the source converts them to payloads for the agent.
WebSocket Source
A source that reads from a WebSocket connection:
use agent_air::{InputSource, ControllerInputPayload, TurnId};
use tokio_tungstenite::WebSocketStream;
use futures_util::StreamExt;
pub struct WebSocketSource {
stream: WebSocketStream<TcpStream>,
session_id: i64,
turn_counter: u64,
}
impl InputSource for WebSocketSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
loop {
match self.stream.next().await {
Some(Ok(Message::Text(text))) => {
self.turn_counter += 1;
// Parse the message (adjust based on your protocol)
let parsed: ClientMessage = serde_json::from_str(&text).ok()?;
return Some(ControllerInputPayload::data(
self.session_id,
parsed.content,
TurnId::new_user_turn(self.turn_counter)
));
}
Some(Ok(Message::Close(_))) | None => return None,
Some(Ok(_)) => continue, // Ignore ping/pong/binary
Some(Err(e)) => {
log::error!("WebSocket error: {}", e);
return None;
}
}
}
})
}
}
The source reads messages until the connection closes, converting each text message into an agent input.
Queue-Based Source
A source that polls a message queue for work:
use agent_air::{InputSource, ControllerInputPayload, TurnId};
pub struct QueueSource {
queue_client: QueueClient,
poll_interval: Duration,
}
impl InputSource for QueueSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
loop {
match self.queue_client.receive_message().await {
Ok(Some(message)) => {
let job: JobPayload = serde_json::from_str(&message.body).ok()?;
return Some(ControllerInputPayload {
session_id: job.session_id,
content: job.prompt,
turn_id: TurnId::new_user_turn(job.turn_number),
metadata: job.metadata,
});
}
Ok(None) => {
// No messages, wait before polling again
tokio::time::sleep(self.poll_interval).await;
}
Err(e) => {
log::error!("Queue error: {}", e);
tokio::time::sleep(self.poll_interval).await;
}
}
}
})
}
}
This source never returns None unless you explicitly signal shutdown, making it suitable for long-running workers.
Session Routing
When your source handles multiple users or conversations, include the correct session ID in each payload. The agent routes messages to the appropriate session based on this ID.
impl InputSource for MultiUserSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
let request = self.requests.recv().await?;
// Look up or create session for this user
let session_id = self.session_map
.entry(request.user_id.clone())
.or_insert_with(|| self.next_session_id());
Some(ControllerInputPayload::data(
*session_id,
request.message,
TurnId::new_user_turn(request.turn_number)
))
})
}
}
Maintain a mapping from your user identifiers to agent session IDs. Create new sessions as needed and clean them up when users disconnect.
Graceful Shutdown
Signal shutdown by returning None from recv(). The agent will complete any in-progress work and then exit run_with_frontend().
pub struct GracefulSource {
rx: mpsc::Receiver<ControllerInputPayload>,
shutdown: Arc<AtomicBool>,
}
impl InputSource for GracefulSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
loop {
tokio::select! {
payload = self.rx.recv() => return payload,
_ = self.wait_for_shutdown() => return None,
}
}
})
}
}
impl GracefulSource {
async fn wait_for_shutdown(&self) {
while !self.shutdown.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Relaxed);
}
}
This pattern lets you trigger shutdown from signal handlers or administrative endpoints.
Error Handling
Sources should handle transient errors internally when possible, logging issues but continuing to operate. Return None only when the source is truly exhausted or a fatal error occurs.
impl InputSource for ResilientSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
loop {
match self.try_recv().await {
Ok(Some(payload)) => return Some(payload),
Ok(None) => return None, // Clean shutdown
Err(e) if e.is_transient() => {
log::warn!("Transient error: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
Err(e) => {
log::error!("Fatal error: {}", e);
return None;
}
}
}
})
}
}
Distinguish between errors that warrant retry (network timeouts, rate limits) and errors that indicate permanent failure (invalid credentials, resource not found).
Batching and Rate Limiting
For high-throughput scenarios, consider batching or rate limiting in your source to prevent overwhelming the agent or downstream systems.
pub struct RateLimitedSource {
inner: Box<dyn InputSource>,
rate_limiter: RateLimiter,
}
impl InputSource for RateLimitedSource {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Option<ControllerInputPayload>> + Send + '_>> {
Box::pin(async move {
// Wait for rate limit permit
self.rate_limiter.acquire().await;
// Then receive from inner source
self.inner.recv().await
})
}
}
Rate limiting at the source prevents queue buildup and provides predictable throughput.
Testing Sources
Test your sources with mock inputs to verify correct payload construction and error handling.
#[tokio::test]
async fn test_http_source() {
let (tx, rx) = mpsc::channel(10);
let mut source = HttpRequestSource::new(rx);
// Send a mock request
tx.send(HttpRequest {
session_id: 1,
message: "Hello".to_string(),
metadata: None,
}).await.unwrap();
// Verify payload
let payload = source.recv().await.unwrap();
assert_eq!(payload.session_id, 1);
assert_eq!(payload.content, "Hello");
assert_eq!(payload.turn_id, TurnId::new_user_turn(1));
// Test shutdown
drop(tx);
assert!(source.recv().await.is_none());
}
Test edge cases like empty messages, malformed input, and connection drops to ensure robust operation.
