Async Runtime
Agent Air uses Tokio as its async runtime, providing non-blocking I/O for LLM API calls, concurrent tool execution, and event-driven message passing. This page explains how the runtime is configured and how background tasks coordinate through channels.
Runtime Initialization
The Tokio runtime is created during AgentAir::new():
let runtime = Runtime::new().map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Failed to create runtime: {}", e),
)
})?;
This creates a multi-threaded runtime with default settings:
- Worker threads based on CPU count
- I/O driver for async networking
- Time driver for timeouts and delays
The runtime is stored as a field in AgentAir and used throughout the agent’s lifecycle:
pub struct AgentAir {
runtime: Runtime,
// ... other fields
}
Background Tasks
When start_background_tasks() is called, AgentAir spawns several long-running tasks:
1. Controller Event Loop
The LLMController runs its main event loop as a background task:
let controller = self.controller.clone();
self.runtime.spawn(async move {
controller.start().await;
});
This task runs the 6-channel select! loop that processes:
- LLM responses
- User input
- Tool results
- Batch results
- User interactions
- Permission requests
2. Input Router
The InputRouter forwards messages from the TUI channel to the controller:
if let Some(to_controller_rx) = self.to_controller_rx.take() {
let router = InputRouter::new(
self.controller.clone(),
to_controller_rx,
self.cancel_token.clone(),
);
self.runtime.spawn(async move {
router.run().await;
});
}
The router loops on the receiver channel:
impl InputRouter {
pub async fn run(mut self) {
loop {
tokio::select! {
_ = self.cancel_token.cancelled() => {
break;
}
msg = self.rx.recv() => {
match msg {
Some(payload) => {
self.controller.send_input(payload).await;
}
None => break,
}
}
}
}
}
}
3. User Interaction Event Forwarder
A task forwards user interaction events from the registry to the TUI:
let ui_tx_for_interactions = from_controller_tx.clone();
runtime.spawn(async move {
while let Some(event) = interaction_event_rx.recv().await {
let msg = convert_controller_event_to_ui_message(event);
if let Err(e) = ui_tx_for_interactions.try_send(msg) {
tracing::warn!("Failed to send user interaction event to UI: {}", e);
}
}
});
4. Permission Event Forwarder
Similarly, permission events are forwarded to the TUI:
let ui_tx_for_permissions = from_controller_tx.clone();
runtime.spawn(async move {
while let Some(event) = permission_event_rx.recv().await {
let msg = convert_controller_event_to_ui_message(event);
if let Err(e) = ui_tx_for_permissions.try_send(msg) {
tracing::warn!("Failed to send permission event to UI: {}", e);
}
}
});
Channel Architecture
Communication between tasks uses Tokio’s MPSC (multi-producer, single-consumer) channels:
use tokio::sync::mpsc;
let (tx, rx) = mpsc::channel::<MessageType>(buffer_size);
Channel Pairs
| Channel | Type Aliases | Purpose |
|---|---|---|
| TUI to Controller | ToControllerTx, ToControllerRx | User input, control commands |
| Controller to TUI | FromControllerTx, FromControllerRx | UI updates, events |
| Session to Controller | - | LLM responses, streaming |
| Tool Results | - | Individual tool completion |
| Batch Results | - | All tools in batch complete |
| Interaction Events | - | User question requests |
| Permission Events | - | Permission requests |
Buffer Sizing
All channels use a default buffer size of 100:
pub const DEFAULT_CHANNEL_SIZE: usize = 100;
let (to_controller_tx, to_controller_rx) =
mpsc::channel::<ControllerInputPayload>(DEFAULT_CHANNEL_SIZE);
This buffer size provides:
- Backpressure to prevent memory exhaustion
- Smoothing for bursty message patterns
- Room for multiple tool results to queue
Non-blocking Sends
Event handlers use try_send() to avoid blocking:
if let Err(e) = ui_tx.try_send(msg) {
tracing::warn!("Failed to send controller event to UI: {}", e);
}
This prevents deadlocks when the receiver is slow or the buffer is full. Messages are logged and dropped rather than blocking the sender.
Cancellation Token
Graceful shutdown uses tokio_util::sync::CancellationToken:
use tokio_util::sync::CancellationToken;
let cancel_token = CancellationToken::new();
Token Distribution
The token is cloned and passed to background tasks:
let router = InputRouter::new(
self.controller.clone(),
to_controller_rx,
self.cancel_token.clone(), // Token cloned here
);
Checking Cancellation
Tasks check for cancellation in their select loops:
tokio::select! {
_ = self.cancel_token.cancelled() => {
tracing::info!("Task cancelled");
break;
}
msg = self.rx.recv() => {
// Handle message
}
}
Triggering Shutdown
The shutdown() method cancels the token:
pub fn shutdown(&self) {
tracing::info!("{} shutting down", self.name);
self.cancel_token.cancel(); // Signal all tasks
let controller = self.controller.clone();
self.runtime.block_on(async move {
controller.shutdown().await;
});
tracing::info!("{} shutdown complete", self.name);
}
Bridging Sync and Async
Since the TUI event loop (App::run()) is synchronous, bridge methods use block_on:
Creating Sessions
pub fn create_initial_session(&mut self) -> Result<(i64, String, i32), AgentError> {
// ...
let session_id = self.runtime.block_on(Self::create_session_internal(
&controller,
config.clone(),
&tool_definitions,
))?;
// ...
}
Shutdown
pub fn shutdown(&self) {
let controller = self.controller.clone();
self.runtime.block_on(async move {
controller.shutdown().await;
});
}
Runtime Handle
For code that needs to spawn tasks without owning the runtime:
pub fn runtime_handle(&self) -> tokio::runtime::Handle {
self.runtime.handle().clone()
}
The handle can spawn tasks on the runtime:
let handle = agent.runtime_handle();
handle.spawn(async move {
// Async work
});
Task Spawning Patterns
Fire and Forget
For tasks that run independently:
self.runtime.spawn(async move {
controller.start().await;
});
With Result Collection
For tasks whose results are needed:
let handle = self.runtime.spawn(async move {
do_async_work().await
});
// Later, if needed:
let result = self.runtime.block_on(handle)?;
Parallel Tool Execution
ToolExecutor spawns parallel tasks for each tool:
for tool_request in requests {
let registry = self.registry.clone();
let result_tx = self.tool_result_tx.clone();
tokio::spawn(async move {
let result = execute_tool(®istry, tool_request).await;
result_tx.send(result).await.ok();
});
}
Async Patterns in LLMSession
Streaming Responses
LLMSession processes streamed API responses:
let stream = self.client.send_message_stream(messages, options).await?;
pin_mut!(stream);
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(text)) => {
self.from_llm_tx.send(FromLLMPayload::TextChunk {
session_id: self.id,
text,
}).await?;
}
// Handle other events
}
}
Concurrent Read/Write Access
Session state uses RwLock for concurrent access:
pub struct LLMSession {
conversation: RwLock<Arc<Vec<Message>>>,
tool_definitions: RwLock<Vec<LLMTool>>,
}
// Reading (allows multiple readers)
let conversation = self.conversation.read().await;
// Writing (exclusive access)
let mut conversation = self.conversation.write().await;
conversation.push(message);
Error Handling in Async Code
Async errors are propagated with ? and logged:
async fn handle_message(&self, payload: ControllerInputPayload) {
if let Err(e) = self.process_message(payload).await {
tracing::error!("Failed to process message: {}", e);
self.emit_error(e.to_string()).await;
}
}
Channel send errors use fallible sends:
// Non-blocking, logs on failure
if let Err(e) = tx.try_send(msg) {
tracing::warn!("Channel full or closed: {}", e);
}
// Blocking, propagates error
tx.send(msg).await?;
Performance Considerations
Avoid Blocking in Async
Never use blocking operations in async tasks:
// Bad - blocks the runtime thread
let result = std::fs::read_to_string("file.txt")?;
// Good - uses async I/O
let result = tokio::fs::read_to_string("file.txt").await?;
Minimize Lock Contention
Release locks before awaiting:
// Bad - holds lock across await
let guard = self.data.lock().await;
do_async_work(&guard).await; // Lock held during await
// Good - clone data, release lock
let data = {
let guard = self.data.lock().await;
guard.clone()
};
do_async_work(&data).await;
Use Arc for Shared Ownership
Clone Arc references instead of the runtime:
let controller = self.controller.clone(); // Clones Arc, not data
self.runtime.spawn(async move {
controller.process().await;
});
Next Steps
- AgentAir Struct - The struct that owns the runtime
- Message Handling - The 6-channel select! pattern
- Controller Events - Events flowing through channels
