Message Handling

The LLMController’s main event loop uses a 6-channel tokio::select! pattern to process messages from multiple sources concurrently. This page explains the pattern, why it is used, and how messages flow through the controller.

Overview

The controller acts as a central hub, receiving messages from six different channels:

┌─────────────────┐
│   LLMSession    │───▶ from_llm_rx
└─────────────────┘

┌─────────────────┐
│   InputRouter   │───▶ input_rx
└─────────────────┘
                         ┌───────────────────┐
┌─────────────────┐      │                   │
│  ToolExecutor   │───▶  │   LLMController   │───▶ ControllerEvent
└─────────────────┘      │    select! loop   │
  (batch_result_rx)      │                   │
                         └───────────────────┘
┌─────────────────┐
│   Tool Tasks    │───▶ tool_result_rx
└─────────────────┘

┌─────────────────┐
│ UserInteraction │───▶ user_interaction_rx
│    Registry     │
└─────────────────┘

┌─────────────────┐
│   Permission    │───▶ permission_rx
│    Registry     │
└─────────────────┘

The Six Channels

ChannelSourceMessage TypePurpose
from_llm_rxLLMSessionFromLLMPayloadStreaming responses from LLM
input_rxInputRouterControllerInputPayloadUser input and control commands
batch_result_rxToolExecutorToolBatchResultCompleted tool batch for LLM
tool_result_rxTool tasksToolResultIndividual tool results for UI
user_interaction_rxUserInteractionRegistryControllerEventUser question requests
permission_rxPermissionRegistryControllerEventPermission requests

The Select Loop

The main event loop acquires locks on all receiver channels, then uses tokio::select! to wait for the first available message:

pub async fn start(&self) {
    loop {
        // Acquire all locks at the start of each iteration
        let mut from_llm_guard = self.from_llm_rx.lock().await;
        let mut input_guard = self.input_rx.lock().await;
        let mut batch_result_guard = self.batch_result_rx.lock().await;
        let mut tool_result_guard = self.tool_result_rx.lock().await;
        let mut user_interaction_guard = self.user_interaction_rx.lock().await;
        let mut permission_guard = self.permission_rx.lock().await;

        tokio::select! {
            // Cancellation signal
            _ = self.cancel_token.cancelled() => {
                tracing::info!("Controller cancelled");
                break;
            }

            // Channel 1: LLM responses
            msg = from_llm_guard.recv() => {
                drop(from_llm_guard);
                drop(input_guard);
                drop(batch_result_guard);
                drop(tool_result_guard);
                drop(user_interaction_guard);
                drop(permission_guard);
                if let Some(payload) = msg {
                    self.handle_llm_response(payload).await;
                } else {
                    break; // Channel closed
                }
            }

            // Channel 2: User input
            msg = input_guard.recv() => {
                // ... drop guards and handle
            }

            // Channel 3: Batch tool results
            batch_result = batch_result_guard.recv() => {
                // ... drop guards and handle
            }

            // Channel 4: Individual tool results
            tool_result = tool_result_guard.recv() => {
                // ... drop guards and handle
            }

            // Channel 5: User interaction events
            user_interaction_event = user_interaction_guard.recv() => {
                // ... drop guards and handle
            }

            // Channel 6: Permission events
            permission_event = permission_guard.recv() => {
                // ... drop guards and handle
            }
        }
    }
}

The Mutex Pattern

Each receiver is wrapped in a Mutex:

pub struct LLMController {
    from_llm_rx: Mutex<mpsc::Receiver<FromLLMPayload>>,
    input_rx: Mutex<mpsc::Receiver<ControllerInputPayload>>,
    batch_result_rx: Mutex<mpsc::Receiver<ToolBatchResult>>,
    tool_result_rx: Mutex<mpsc::Receiver<ToolResult>>,
    user_interaction_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
    permission_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
}

Why Mutex?

The select! macro requires mutable access to receivers. Since LLMController is shared via Arc, receivers need interior mutability. Mutex provides this safely.

Lock Acquisition Order

Locks are acquired in a consistent order at the start of each iteration:

let mut from_llm_guard = self.from_llm_rx.lock().await;
let mut input_guard = self.input_rx.lock().await;
let mut batch_result_guard = self.batch_result_rx.lock().await;
let mut tool_result_guard = self.tool_result_rx.lock().await;
let mut user_interaction_guard = self.user_interaction_rx.lock().await;
let mut permission_guard = self.permission_rx.lock().await;

This prevents deadlocks by ensuring locks are always acquired in the same order.

Immediate Drop Pattern

Each select! branch immediately drops all guards:

msg = from_llm_guard.recv() => {
    drop(from_llm_guard);
    drop(input_guard);
    drop(batch_result_guard);
    drop(tool_result_guard);
    drop(user_interaction_guard);
    drop(permission_guard);
    // Now handle the message
    self.handle_llm_response(payload).await;
}

This pattern:

  • Releases locks before processing (processing may be slow)
  • Prevents blocking other senders
  • Makes lock lifetimes explicit

Why This Is Safe and Efficient

The code comments explain the design rationale:

// DESIGN NOTE: Mutex Pattern for Multiple Receivers
// -------------------------------------------------
// This loop acquires all 6 receiver locks at the start of each iteration,
// then immediately drops them in each select! branch. This pattern is:
//
// 1. SAFE: No deadlock risk - locks acquired in consistent order, released immediately
// 2. EFFICIENT: Locks held only during polling (~microseconds), not while waiting
// 3. NON-BLOCKING: Tokio's mpsc senders are lock-free; only receivers need mutex
// 4. CLEAR: Explicit drops make guard lifecycle obvious

Message Handlers

handle_llm_response()

Processes responses from LLM sessions:

async fn handle_llm_response(&self, payload: FromLLMPayload) {
    match payload.response_type {
        LLMResponseType::StreamStart => {
            self.emit_event(ControllerEvent::StreamStart { ... });
        }
        LLMResponseType::TextChunk => {
            self.emit_event(ControllerEvent::TextChunk { ... });
        }
        LLMResponseType::ToolUse | LLMResponseType::ToolBatch => {
            // Extract tool requests and execute
            self.execute_tools(payload.session_id, tools).await;
        }
        LLMResponseType::Complete => {
            self.emit_event(ControllerEvent::Complete { ... });
        }
        LLMResponseType::TokenUpdate => {
            self.token_usage.increment(...);
            self.emit_event(ControllerEvent::TokenUpdate { ... });
        }
        LLMResponseType::Error => {
            self.emit_event(ControllerEvent::Error { ... });
        }
    }
}

handle_input()

Dispatches user input and control commands:

async fn handle_input(&self, payload: ControllerInputPayload) {
    match payload.input_type {
        InputType::Data => {
            self.handle_data_input(payload).await;
        }
        InputType::Control => {
            self.handle_control_input(payload).await;
        }
    }
}

Data input sends messages to the LLM session. Control input handles commands like interrupt, clear, and shutdown.

handle_tool_batch_result()

Processes completed tool batches and sends results back to the LLM:

async fn handle_tool_batch_result(&self, batch: ToolBatchResult) {
    let tool_results: Vec<ToolResultInfo> = batch.results
        .into_iter()
        .map(|r| ToolResultInfo {
            tool_use_id: r.tool_use_id,
            content: r.content,
            is_error: r.status != ToolResultStatus::Success,
        })
        .collect();

    // Send tool results back to the LLM
    if let Some(session) = self.session_mgr.get_session_by_id(batch.session_id).await {
        session.send(ToLLMPayload {
            request_type: LLMRequestType::ToolResult,
            tool_results,
            ..Default::default()
        }).await;
    }
}

Tool Result Handler

Individual tool results are emitted for UI feedback:

tool_result = tool_result_guard.recv() => {
    // ...
    if let Some(result) = tool_result {
        if let Some(ref func) = self.event_func {
            func(ControllerEvent::ToolResult {
                session_id: result.session_id,
                tool_use_id: result.tool_use_id,
                tool_name: result.tool_name,
                display_name: result.display_name,
                status: result.status,
                content: result.content,
                error: result.error,
                turn_id: result.turn_id,
            });
        }
    }
}

User Interaction Handler

Forwards user interaction requests to the event callback:

user_interaction_event = user_interaction_guard.recv() => {
    // ...
    if let Some(event) = user_interaction_event {
        if let Some(ref func) = self.event_func {
            func(event);
        }
    }
}

Permission Handler

Forwards permission requests to the event callback:

permission_event = permission_guard.recv() => {
    // ...
    if let Some(event) = permission_event {
        if let Some(ref func) = self.event_func {
            func(event);
        }
    }
}

Event Emission

The controller emits events through the callback function:

fn emit_event(&self, event: ControllerEvent) {
    if let Some(ref func) = self.event_func {
        func(event);
    }
}

The callback is set during controller construction and typically forwards events to the TUI channel.

Cancellation

The loop checks for cancellation at the start of each select!:

_ = self.cancel_token.cancelled() => {
    tracing::info!("Controller cancelled");
    break;
}

This enables graceful shutdown when cancel_token.cancel() is called.

Alternative Patterns Considered

The code comments note alternative patterns that were considered:

  1. Unified event channel: Would lose type safety, require boxing all events
  2. Select on lock().recv() directly: Makes code harder to reason about
  3. Lock-free structures: Overkill; Tokio primitives are already optimized

The chosen pattern balances safety, clarity, and performance.

Send Input Method

External code sends input to the controller via send_input():

pub async fn send_input(&self, payload: ControllerInputPayload) -> Result<(), ControllerError> {
    tokio::time::timeout(SEND_INPUT_TIMEOUT, self.input_tx.send(payload))
        .await
        .map_err(|_| ControllerError::SendTimeout)?
        .map_err(|_| ControllerError::ChannelClosed)
}

This method:

  • Has a 5-second timeout to prevent indefinite blocking
  • Returns errors if the channel is closed or times out
  • Is called by the InputRouter

Shutdown Sequence

When shutdown is triggered:

  1. cancel_token.cancel() is called
  2. The select loop breaks on the cancellation branch
  3. Any in-flight messages are dropped
  4. The start() method returns
pub async fn shutdown(&self) {
    self.cancel_token.cancel();
    self.shutdown.store(true, Ordering::SeqCst);
}

Next Steps