Message Handling
The LLMController’s main event loop uses a 6-channel tokio::select! pattern to process messages from multiple sources concurrently. This page explains the pattern, why it is used, and how messages flow through the controller.
Overview
The controller acts as a central hub, receiving messages from six different channels:
┌─────────────────┐
│ LLMSession │───▶ from_llm_rx
└─────────────────┘
┌─────────────────┐
│ InputRouter │───▶ input_rx
└─────────────────┘
┌───────────────────┐
┌─────────────────┐ │ │
│ ToolExecutor │───▶ │ LLMController │───▶ ControllerEvent
└─────────────────┘ │ select! loop │
(batch_result_rx) │ │
└───────────────────┘
┌─────────────────┐
│ Tool Tasks │───▶ tool_result_rx
└─────────────────┘
┌─────────────────┐
│ UserInteraction │───▶ user_interaction_rx
│ Registry │
└─────────────────┘
┌─────────────────┐
│ Permission │───▶ permission_rx
│ Registry │
└─────────────────┘
The Six Channels
| Channel | Source | Message Type | Purpose |
|---|---|---|---|
from_llm_rx | LLMSession | FromLLMPayload | Streaming responses from LLM |
input_rx | InputRouter | ControllerInputPayload | User input and control commands |
batch_result_rx | ToolExecutor | ToolBatchResult | Completed tool batch for LLM |
tool_result_rx | Tool tasks | ToolResult | Individual tool results for UI |
user_interaction_rx | UserInteractionRegistry | ControllerEvent | User question requests |
permission_rx | PermissionRegistry | ControllerEvent | Permission requests |
The Select Loop
The main event loop acquires locks on all receiver channels, then uses tokio::select! to wait for the first available message:
pub async fn start(&self) {
loop {
// Acquire all locks at the start of each iteration
let mut from_llm_guard = self.from_llm_rx.lock().await;
let mut input_guard = self.input_rx.lock().await;
let mut batch_result_guard = self.batch_result_rx.lock().await;
let mut tool_result_guard = self.tool_result_rx.lock().await;
let mut user_interaction_guard = self.user_interaction_rx.lock().await;
let mut permission_guard = self.permission_rx.lock().await;
tokio::select! {
// Cancellation signal
_ = self.cancel_token.cancelled() => {
tracing::info!("Controller cancelled");
break;
}
// Channel 1: LLM responses
msg = from_llm_guard.recv() => {
drop(from_llm_guard);
drop(input_guard);
drop(batch_result_guard);
drop(tool_result_guard);
drop(user_interaction_guard);
drop(permission_guard);
if let Some(payload) = msg {
self.handle_llm_response(payload).await;
} else {
break; // Channel closed
}
}
// Channel 2: User input
msg = input_guard.recv() => {
// ... drop guards and handle
}
// Channel 3: Batch tool results
batch_result = batch_result_guard.recv() => {
// ... drop guards and handle
}
// Channel 4: Individual tool results
tool_result = tool_result_guard.recv() => {
// ... drop guards and handle
}
// Channel 5: User interaction events
user_interaction_event = user_interaction_guard.recv() => {
// ... drop guards and handle
}
// Channel 6: Permission events
permission_event = permission_guard.recv() => {
// ... drop guards and handle
}
}
}
}
The Mutex Pattern
Each receiver is wrapped in a Mutex:
pub struct LLMController {
from_llm_rx: Mutex<mpsc::Receiver<FromLLMPayload>>,
input_rx: Mutex<mpsc::Receiver<ControllerInputPayload>>,
batch_result_rx: Mutex<mpsc::Receiver<ToolBatchResult>>,
tool_result_rx: Mutex<mpsc::Receiver<ToolResult>>,
user_interaction_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
permission_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
}
Why Mutex?
The select! macro requires mutable access to receivers. Since LLMController is shared via Arc, receivers need interior mutability. Mutex provides this safely.
Lock Acquisition Order
Locks are acquired in a consistent order at the start of each iteration:
let mut from_llm_guard = self.from_llm_rx.lock().await;
let mut input_guard = self.input_rx.lock().await;
let mut batch_result_guard = self.batch_result_rx.lock().await;
let mut tool_result_guard = self.tool_result_rx.lock().await;
let mut user_interaction_guard = self.user_interaction_rx.lock().await;
let mut permission_guard = self.permission_rx.lock().await;
This prevents deadlocks by ensuring locks are always acquired in the same order.
Immediate Drop Pattern
Each select! branch immediately drops all guards:
msg = from_llm_guard.recv() => {
drop(from_llm_guard);
drop(input_guard);
drop(batch_result_guard);
drop(tool_result_guard);
drop(user_interaction_guard);
drop(permission_guard);
// Now handle the message
self.handle_llm_response(payload).await;
}
This pattern:
- Releases locks before processing (processing may be slow)
- Prevents blocking other senders
- Makes lock lifetimes explicit
Why This Is Safe and Efficient
The code comments explain the design rationale:
// DESIGN NOTE: Mutex Pattern for Multiple Receivers
// -------------------------------------------------
// This loop acquires all 6 receiver locks at the start of each iteration,
// then immediately drops them in each select! branch. This pattern is:
//
// 1. SAFE: No deadlock risk - locks acquired in consistent order, released immediately
// 2. EFFICIENT: Locks held only during polling (~microseconds), not while waiting
// 3. NON-BLOCKING: Tokio's mpsc senders are lock-free; only receivers need mutex
// 4. CLEAR: Explicit drops make guard lifecycle obvious
Message Handlers
handle_llm_response()
Processes responses from LLM sessions:
async fn handle_llm_response(&self, payload: FromLLMPayload) {
match payload.response_type {
LLMResponseType::StreamStart => {
self.emit_event(ControllerEvent::StreamStart { ... });
}
LLMResponseType::TextChunk => {
self.emit_event(ControllerEvent::TextChunk { ... });
}
LLMResponseType::ToolUse | LLMResponseType::ToolBatch => {
// Extract tool requests and execute
self.execute_tools(payload.session_id, tools).await;
}
LLMResponseType::Complete => {
self.emit_event(ControllerEvent::Complete { ... });
}
LLMResponseType::TokenUpdate => {
self.token_usage.increment(...);
self.emit_event(ControllerEvent::TokenUpdate { ... });
}
LLMResponseType::Error => {
self.emit_event(ControllerEvent::Error { ... });
}
}
}
handle_input()
Dispatches user input and control commands:
async fn handle_input(&self, payload: ControllerInputPayload) {
match payload.input_type {
InputType::Data => {
self.handle_data_input(payload).await;
}
InputType::Control => {
self.handle_control_input(payload).await;
}
}
}
Data input sends messages to the LLM session. Control input handles commands like interrupt, clear, and shutdown.
handle_tool_batch_result()
Processes completed tool batches and sends results back to the LLM:
async fn handle_tool_batch_result(&self, batch: ToolBatchResult) {
let tool_results: Vec<ToolResultInfo> = batch.results
.into_iter()
.map(|r| ToolResultInfo {
tool_use_id: r.tool_use_id,
content: r.content,
is_error: r.status != ToolResultStatus::Success,
})
.collect();
// Send tool results back to the LLM
if let Some(session) = self.session_mgr.get_session_by_id(batch.session_id).await {
session.send(ToLLMPayload {
request_type: LLMRequestType::ToolResult,
tool_results,
..Default::default()
}).await;
}
}
Tool Result Handler
Individual tool results are emitted for UI feedback:
tool_result = tool_result_guard.recv() => {
// ...
if let Some(result) = tool_result {
if let Some(ref func) = self.event_func {
func(ControllerEvent::ToolResult {
session_id: result.session_id,
tool_use_id: result.tool_use_id,
tool_name: result.tool_name,
display_name: result.display_name,
status: result.status,
content: result.content,
error: result.error,
turn_id: result.turn_id,
});
}
}
}
User Interaction Handler
Forwards user interaction requests to the event callback:
user_interaction_event = user_interaction_guard.recv() => {
// ...
if let Some(event) = user_interaction_event {
if let Some(ref func) = self.event_func {
func(event);
}
}
}
Permission Handler
Forwards permission requests to the event callback:
permission_event = permission_guard.recv() => {
// ...
if let Some(event) = permission_event {
if let Some(ref func) = self.event_func {
func(event);
}
}
}
Event Emission
The controller emits events through the callback function:
fn emit_event(&self, event: ControllerEvent) {
if let Some(ref func) = self.event_func {
func(event);
}
}
The callback is set during controller construction and typically forwards events to the TUI channel.
Cancellation
The loop checks for cancellation at the start of each select!:
_ = self.cancel_token.cancelled() => {
tracing::info!("Controller cancelled");
break;
}
This enables graceful shutdown when cancel_token.cancel() is called.
Alternative Patterns Considered
The code comments note alternative patterns that were considered:
- Unified event channel: Would lose type safety, require boxing all events
- Select on lock().recv() directly: Makes code harder to reason about
- Lock-free structures: Overkill; Tokio primitives are already optimized
The chosen pattern balances safety, clarity, and performance.
Send Input Method
External code sends input to the controller via send_input():
pub async fn send_input(&self, payload: ControllerInputPayload) -> Result<(), ControllerError> {
tokio::time::timeout(SEND_INPUT_TIMEOUT, self.input_tx.send(payload))
.await
.map_err(|_| ControllerError::SendTimeout)?
.map_err(|_| ControllerError::ChannelClosed)
}
This method:
- Has a 5-second timeout to prevent indefinite blocking
- Returns errors if the channel is closed or times out
- Is called by the InputRouter
Shutdown Sequence
When shutdown is triggered:
cancel_token.cancel()is called- The select loop breaks on the cancellation branch
- Any in-flight messages are dropped
- The
start()method returns
pub async fn shutdown(&self) {
self.cancel_token.cancel();
self.shutdown.store(true, Ordering::SeqCst);
}
Next Steps
- Controller Events - Events emitted by the controller and conversion logic
- Async Runtime - The Tokio runtime that powers the loop
- Message Flow - End-to-end message routing
