LLMController

The LLMController is the central coordinator for all LLM interactions. It manages sessions, processes user input, handles LLM responses, executes tools, and emits events for the TUI. This page documents its structure, responsibilities, and internal operation.

Overview

The controller acts as a hub between multiple components:

┌─────────────────┐          ┌─────────────────┐
│      TUI        │◀────────▶│  InputRouter    │
└─────────────────┘          └────────┬────────┘


┌─────────────────┐          ┌─────────────────┐          ┌─────────────────┐
│   LLMSession    │◀────────▶│  LLMController  │◀────────▶│  ToolExecutor   │
└─────────────────┘          └─────────────────┘          └─────────────────┘

                    ┌─────────────────┼─────────────────┐
                    ▼                 ▼                 ▼
            ┌───────────┐     ┌───────────┐     ┌───────────┐
            │ ToolReg   │     │ UserReg   │     │ PermReg   │
            └───────────┘     └───────────┘     └───────────┘

Struct Definition

pub struct LLMController {
    // Session management
    session_mgr: LLMSessionManager,
    token_usage: TokenUsageTracker,

    // LLM response channel
    from_llm_rx: Mutex<mpsc::Receiver<FromLLMPayload>>,
    from_llm_tx: mpsc::Sender<FromLLMPayload>,

    // User input channel
    input_rx: Mutex<mpsc::Receiver<ControllerInputPayload>>,
    input_tx: mpsc::Sender<ControllerInputPayload>,

    // Tool execution channels
    tool_result_rx: Mutex<mpsc::Receiver<ToolResult>>,
    batch_result_rx: Mutex<mpsc::Receiver<ToolBatchResult>>,

    // User interaction channels
    user_interaction_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
    permission_rx: Mutex<mpsc::Receiver<ControllerEvent>>,

    // Registries
    tool_registry: Arc<ToolRegistry>,
    user_interaction_registry: Arc<UserInteractionRegistry>,
    permission_registry: Arc<PermissionRegistry>,

    // Tool execution
    tool_executor: ToolExecutor,

    // State flags
    started: AtomicBool,
    shutdown: AtomicBool,
    cancel_token: CancellationToken,

    // Event callback
    event_func: Option<EventFunc>,
}

Construction

impl LLMController {
    pub fn new(event_func: Option<EventFunc>) -> Self {
        // Create LLM communication channels
        let (from_llm_tx, from_llm_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
        let (input_tx, input_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

        // Create tool execution channels
        let (tool_result_tx, tool_result_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
        let (batch_result_tx, batch_result_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

        // Create user interaction registry
        let (user_interaction_tx, user_interaction_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
        let user_interaction_registry = Arc::new(
            UserInteractionRegistry::new(user_interaction_tx)
        );

        // Create permission registry
        let (permission_tx, permission_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
        let permission_registry = Arc::new(
            PermissionRegistry::new(permission_tx)
        );

        // Create tool infrastructure
        let tool_registry = Arc::new(ToolRegistry::new());
        let tool_executor = ToolExecutor::new(
            tool_registry.clone(),
            tool_result_tx,
            batch_result_tx,
        );

        Self {
            session_mgr: LLMSessionManager::new(),
            token_usage: TokenUsageTracker::new(),
            from_llm_rx: Mutex::new(from_llm_rx),
            from_llm_tx,
            input_rx: Mutex::new(input_rx),
            input_tx,
            tool_result_rx: Mutex::new(tool_result_rx),
            batch_result_rx: Mutex::new(batch_result_rx),
            user_interaction_rx: Mutex::new(user_interaction_rx),
            permission_rx: Mutex::new(permission_rx),
            tool_registry,
            user_interaction_registry,
            permission_registry,
            tool_executor,
            started: AtomicBool::new(false),
            shutdown: AtomicBool::new(false),
            cancel_token: CancellationToken::new(),
            event_func,
        }
    }
}

The Main Event Loop

The start() method runs the 6-channel select loop:

pub async fn start(&self) {
    // Ensure idempotent startup
    if self.started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
        tracing::warn!("Controller already started");
        return;
    }

    tracing::info!("Controller starting");

    loop {
        // Acquire all receiver locks
        let mut from_llm_guard = self.from_llm_rx.lock().await;
        let mut input_guard = self.input_rx.lock().await;
        let mut batch_result_guard = self.batch_result_rx.lock().await;
        let mut tool_result_guard = self.tool_result_rx.lock().await;
        let mut user_interaction_guard = self.user_interaction_rx.lock().await;
        let mut permission_guard = self.permission_rx.lock().await;

        tokio::select! {
            // Cancellation check
            _ = self.cancel_token.cancelled() => {
                tracing::info!("Controller cancelled");
                break;
            }

            // LLM responses
            msg = from_llm_guard.recv() => {
                drop_all_guards!(...);
                if let Some(payload) = msg {
                    self.handle_llm_response(payload).await;
                } else {
                    break;
                }
            }

            // User input
            msg = input_guard.recv() => {
                drop_all_guards!(...);
                if let Some(payload) = msg {
                    self.handle_input(payload).await;
                } else {
                    break;
                }
            }

            // Batch tool results
            batch = batch_result_guard.recv() => {
                drop_all_guards!(...);
                if let Some(result) = batch {
                    self.handle_tool_batch_result(result).await;
                }
            }

            // Individual tool results (for UI feedback)
            result = tool_result_guard.recv() => {
                drop_all_guards!(...);
                if let Some(result) = result {
                    self.emit_tool_result(result);
                }
            }

            // User interaction events
            event = user_interaction_guard.recv() => {
                drop_all_guards!(...);
                if let Some(event) = event {
                    self.forward_event(event);
                }
            }

            // Permission events
            event = permission_guard.recv() => {
                drop_all_guards!(...);
                if let Some(event) = event {
                    self.forward_event(event);
                }
            }
        }
    }

    tracing::info!("Controller stopped");
}

Session Management

LLMSessionManager

The session manager tracks active sessions:

pub struct LLMSessionManager {
    sessions: RwLock<HashMap<i64, Arc<LLMSession>>>,
}

impl LLMSessionManager {
    pub async fn create_session(
        &self,
        config: LLMSessionConfig,
        from_llm_tx: mpsc::Sender<FromLLMPayload>,
    ) -> Result<i64, LlmError> {
        let session = Arc::new(LLMSession::new(config, from_llm_tx)?);
        let session_id = session.id();

        // Store session
        self.sessions.write().await.insert(session_id, session.clone());

        // Start session task
        tokio::spawn(async move {
            session.start().await;
        });

        Ok(session_id)
    }

    pub async fn get_session_by_id(&self, id: i64) -> Option<Arc<LLMSession>> {
        self.sessions.read().await.get(&id).cloned()
    }
}

Creating Sessions

impl LLMController {
    pub async fn create_session(
        &self,
        config: LLMSessionConfig,
        tools: &[LLMTool],
    ) -> Result<i64, LlmError> {
        let session_id = self.session_mgr
            .create_session(config, self.from_llm_tx.clone())
            .await?;

        // Add tools to session
        if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
            session.set_tools(tools.to_vec()).await;
        }

        Ok(session_id)
    }
}

Token Usage Tracking

pub struct TokenUsageTracker {
    usage: RwLock<HashMap<i64, SessionUsage>>,
}

struct SessionUsage {
    model: String,
    input_tokens: i64,
    output_tokens: i64,
}

impl TokenUsageTracker {
    pub async fn increment(
        &self,
        session_id: i64,
        model: &str,
        input: i64,
        output: i64,
    ) {
        let mut usage = self.usage.write().await;
        let entry = usage.entry(session_id).or_insert(SessionUsage {
            model: model.to_string(),
            input_tokens: 0,
            output_tokens: 0,
        });
        entry.input_tokens += input;
        entry.output_tokens += output;
    }

    pub async fn get(&self, session_id: i64) -> Option<(i64, i64)> {
        self.usage.read().await.get(&session_id)
            .map(|u| (u.input_tokens, u.output_tokens))
    }
}

Sending Input

External code sends input via send_input():

pub async fn send_input(&self, payload: ControllerInputPayload) -> Result<(), ControllerError> {
    if self.is_shutdown() {
        return Err(ControllerError::Shutdown);
    }

    // 5-second timeout
    tokio::time::timeout(
        Duration::from_secs(5),
        self.input_tx.send(payload)
    )
    .await
    .map_err(|_| ControllerError::SendTimeout(5))?
    .map_err(|_| ControllerError::ChannelClosed)
}

Event Emission

Events are emitted through the callback function:

fn emit_event(&self, event: ControllerEvent) {
    if let Some(ref func) = self.event_func {
        func(event);
    }
}

fn emit_error(&self, session_id: i64, error: &str, turn_id: Option<TurnId>) {
    self.emit_event(ControllerEvent::Error {
        session_id,
        error: error.to_string(),
        turn_id,
    });
}

Registry Access

impl LLMController {
    pub fn tool_registry(&self) -> Arc<ToolRegistry> {
        self.tool_registry.clone()
    }

    pub fn user_interaction_registry(&self) -> Arc<UserInteractionRegistry> {
        self.user_interaction_registry.clone()
    }

    pub fn permission_registry(&self) -> Arc<PermissionRegistry> {
        self.permission_registry.clone()
    }
}

Shutdown

pub async fn shutdown(&self) {
    if self.shutdown.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
        return; // Already shutting down
    }

    tracing::info!("Controller shutting down");
    self.cancel_token.cancel();
}

pub fn is_shutdown(&self) -> bool {
    self.shutdown.load(Ordering::SeqCst)
}

State Flags

Atomic booleans track controller state:

started: AtomicBool,   // Has start() been called?
shutdown: AtomicBool,  // Has shutdown() been called?

These ensure:

  • start() is idempotent (only runs once)
  • shutdown() is idempotent (only runs once)
  • send_input() fails after shutdown

Thread Safety

The controller is designed for concurrent access:

ComponentSynchronization
ReceiversMutex (required for select!)
SessionsRwLock (read-heavy)
Token usageRwLock (read-heavy)
State flagsAtomicBool (lock-free)
RegistriesArc + internal locking

Event Types

The callback receives events defined in ControllerEvent:

pub type EventFunc = Box<dyn Fn(ControllerEvent) + Send + Sync>;

See Controller Events for all event variants.

Next Steps