Async Runtime

Agent Air uses Tokio as its async runtime, providing non-blocking I/O for LLM API calls, concurrent tool execution, and event-driven message passing. This page explains how the runtime is configured and how background tasks coordinate through channels.

Runtime Initialization

The Tokio runtime is created during AgentAir::new():

let runtime = Runtime::new().map_err(|e| {
    io::Error::new(
        io::ErrorKind::Other,
        format!("Failed to create runtime: {}", e),
    )
})?;

This creates a multi-threaded runtime with default settings:

  • Worker threads based on CPU count
  • I/O driver for async networking
  • Time driver for timeouts and delays

The runtime is stored as a field in AgentAir and used throughout the agent’s lifecycle:

pub struct AgentAir {
    runtime: Runtime,
    // ... other fields
}

Background Tasks

When start_background_tasks() is called, AgentAir spawns several long-running tasks:

1. Controller Event Loop

The LLMController runs its main event loop as a background task:

let controller = self.controller.clone();
self.runtime.spawn(async move {
    controller.start().await;
});

This task runs the 6-channel select! loop that processes:

  • LLM responses
  • User input
  • Tool results
  • Batch results
  • User interactions
  • Permission requests

2. Input Router

The InputRouter forwards messages from the TUI channel to the controller:

if let Some(to_controller_rx) = self.to_controller_rx.take() {
    let router = InputRouter::new(
        self.controller.clone(),
        to_controller_rx,
        self.cancel_token.clone(),
    );
    self.runtime.spawn(async move {
        router.run().await;
    });
}

The router loops on the receiver channel:

impl InputRouter {
    pub async fn run(mut self) {
        loop {
            tokio::select! {
                _ = self.cancel_token.cancelled() => {
                    break;
                }
                msg = self.rx.recv() => {
                    match msg {
                        Some(payload) => {
                            self.controller.send_input(payload).await;
                        }
                        None => break,
                    }
                }
            }
        }
    }
}

3. User Interaction Event Forwarder

A task forwards user interaction events from the registry to the TUI:

let ui_tx_for_interactions = from_controller_tx.clone();
runtime.spawn(async move {
    while let Some(event) = interaction_event_rx.recv().await {
        let msg = convert_controller_event_to_ui_message(event);
        if let Err(e) = ui_tx_for_interactions.try_send(msg) {
            tracing::warn!("Failed to send user interaction event to UI: {}", e);
        }
    }
});

4. Permission Event Forwarder

Similarly, permission events are forwarded to the TUI:

let ui_tx_for_permissions = from_controller_tx.clone();
runtime.spawn(async move {
    while let Some(event) = permission_event_rx.recv().await {
        let msg = convert_controller_event_to_ui_message(event);
        if let Err(e) = ui_tx_for_permissions.try_send(msg) {
            tracing::warn!("Failed to send permission event to UI: {}", e);
        }
    }
});

Channel Architecture

Communication between tasks uses Tokio’s MPSC (multi-producer, single-consumer) channels:

use tokio::sync::mpsc;

let (tx, rx) = mpsc::channel::<MessageType>(buffer_size);

Channel Pairs

ChannelType AliasesPurpose
TUI to ControllerToControllerTx, ToControllerRxUser input, control commands
Controller to TUIFromControllerTx, FromControllerRxUI updates, events
Session to Controller-LLM responses, streaming
Tool Results-Individual tool completion
Batch Results-All tools in batch complete
Interaction Events-User question requests
Permission Events-Permission requests

Buffer Sizing

All channels use a default buffer size of 100:

pub const DEFAULT_CHANNEL_SIZE: usize = 100;

let (to_controller_tx, to_controller_rx) =
    mpsc::channel::<ControllerInputPayload>(DEFAULT_CHANNEL_SIZE);

This buffer size provides:

  • Backpressure to prevent memory exhaustion
  • Smoothing for bursty message patterns
  • Room for multiple tool results to queue

Non-blocking Sends

Event handlers use try_send() to avoid blocking:

if let Err(e) = ui_tx.try_send(msg) {
    tracing::warn!("Failed to send controller event to UI: {}", e);
}

This prevents deadlocks when the receiver is slow or the buffer is full. Messages are logged and dropped rather than blocking the sender.

Cancellation Token

Graceful shutdown uses tokio_util::sync::CancellationToken:

use tokio_util::sync::CancellationToken;

let cancel_token = CancellationToken::new();

Token Distribution

The token is cloned and passed to background tasks:

let router = InputRouter::new(
    self.controller.clone(),
    to_controller_rx,
    self.cancel_token.clone(),  // Token cloned here
);

Checking Cancellation

Tasks check for cancellation in their select loops:

tokio::select! {
    _ = self.cancel_token.cancelled() => {
        tracing::info!("Task cancelled");
        break;
    }
    msg = self.rx.recv() => {
        // Handle message
    }
}

Triggering Shutdown

The shutdown() method cancels the token:

pub fn shutdown(&self) {
    tracing::info!("{} shutting down", self.name);
    self.cancel_token.cancel();  // Signal all tasks

    let controller = self.controller.clone();
    self.runtime.block_on(async move {
        controller.shutdown().await;
    });

    tracing::info!("{} shutdown complete", self.name);
}

Bridging Sync and Async

Since the TUI event loop (App::run()) is synchronous, bridge methods use block_on:

Creating Sessions

pub fn create_initial_session(&mut self) -> Result<(i64, String, i32), AgentError> {
    // ...
    let session_id = self.runtime.block_on(Self::create_session_internal(
        &controller,
        config.clone(),
        &tool_definitions,
    ))?;
    // ...
}

Shutdown

pub fn shutdown(&self) {
    let controller = self.controller.clone();
    self.runtime.block_on(async move {
        controller.shutdown().await;
    });
}

Runtime Handle

For code that needs to spawn tasks without owning the runtime:

pub fn runtime_handle(&self) -> tokio::runtime::Handle {
    self.runtime.handle().clone()
}

The handle can spawn tasks on the runtime:

let handle = agent.runtime_handle();
handle.spawn(async move {
    // Async work
});

Task Spawning Patterns

Fire and Forget

For tasks that run independently:

self.runtime.spawn(async move {
    controller.start().await;
});

With Result Collection

For tasks whose results are needed:

let handle = self.runtime.spawn(async move {
    do_async_work().await
});

// Later, if needed:
let result = self.runtime.block_on(handle)?;

Parallel Tool Execution

ToolExecutor spawns parallel tasks for each tool:

for tool_request in requests {
    let registry = self.registry.clone();
    let result_tx = self.tool_result_tx.clone();

    tokio::spawn(async move {
        let result = execute_tool(&registry, tool_request).await;
        result_tx.send(result).await.ok();
    });
}

Async Patterns in LLMSession

Streaming Responses

LLMSession processes streamed API responses:

let stream = self.client.send_message_stream(messages, options).await?;

pin_mut!(stream);
while let Some(event) = stream.next().await {
    match event {
        Ok(StreamEvent::TextDelta(text)) => {
            self.from_llm_tx.send(FromLLMPayload::TextChunk {
                session_id: self.id,
                text,
            }).await?;
        }
        // Handle other events
    }
}

Concurrent Read/Write Access

Session state uses RwLock for concurrent access:

pub struct LLMSession {
    conversation: RwLock<Arc<Vec<Message>>>,
    tool_definitions: RwLock<Vec<LLMTool>>,
}

// Reading (allows multiple readers)
let conversation = self.conversation.read().await;

// Writing (exclusive access)
let mut conversation = self.conversation.write().await;
conversation.push(message);

Error Handling in Async Code

Async errors are propagated with ? and logged:

async fn handle_message(&self, payload: ControllerInputPayload) {
    if let Err(e) = self.process_message(payload).await {
        tracing::error!("Failed to process message: {}", e);
        self.emit_error(e.to_string()).await;
    }
}

Channel send errors use fallible sends:

// Non-blocking, logs on failure
if let Err(e) = tx.try_send(msg) {
    tracing::warn!("Channel full or closed: {}", e);
}

// Blocking, propagates error
tx.send(msg).await?;

Performance Considerations

Avoid Blocking in Async

Never use blocking operations in async tasks:

// Bad - blocks the runtime thread
let result = std::fs::read_to_string("file.txt")?;

// Good - uses async I/O
let result = tokio::fs::read_to_string("file.txt").await?;

Minimize Lock Contention

Release locks before awaiting:

// Bad - holds lock across await
let guard = self.data.lock().await;
do_async_work(&guard).await;  // Lock held during await

// Good - clone data, release lock
let data = {
    let guard = self.data.lock().await;
    guard.clone()
};
do_async_work(&data).await;

Use Arc for Shared Ownership

Clone Arc references instead of the runtime:

let controller = self.controller.clone();  // Clones Arc, not data
self.runtime.spawn(async move {
    controller.process().await;
});

Next Steps