LLMController
The LLMController is the central coordinator for all LLM interactions. It manages sessions, processes user input, handles LLM responses, executes tools, and emits events for the TUI. This page documents its structure, responsibilities, and internal operation.
Overview
The controller acts as a hub between multiple components:
┌─────────────────┐ ┌─────────────────┐
│ TUI │◀────────▶│ InputRouter │
└─────────────────┘ └────────┬────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ LLMSession │◀────────▶│ LLMController │◀────────▶│ ToolExecutor │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ ToolReg │ │ UserReg │ │ PermReg │
└───────────┘ └───────────┘ └───────────┘
Struct Definition
pub struct LLMController {
// Session management
session_mgr: LLMSessionManager,
token_usage: TokenUsageTracker,
// LLM response channel
from_llm_rx: Mutex<mpsc::Receiver<FromLLMPayload>>,
from_llm_tx: mpsc::Sender<FromLLMPayload>,
// User input channel
input_rx: Mutex<mpsc::Receiver<ControllerInputPayload>>,
input_tx: mpsc::Sender<ControllerInputPayload>,
// Tool execution channels
tool_result_rx: Mutex<mpsc::Receiver<ToolResult>>,
batch_result_rx: Mutex<mpsc::Receiver<ToolBatchResult>>,
// User interaction channels
user_interaction_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
permission_rx: Mutex<mpsc::Receiver<ControllerEvent>>,
// Registries
tool_registry: Arc<ToolRegistry>,
user_interaction_registry: Arc<UserInteractionRegistry>,
permission_registry: Arc<PermissionRegistry>,
// Tool execution
tool_executor: ToolExecutor,
// State flags
started: AtomicBool,
shutdown: AtomicBool,
cancel_token: CancellationToken,
// Event callback
event_func: Option<EventFunc>,
}
Construction
impl LLMController {
pub fn new(event_func: Option<EventFunc>) -> Self {
// Create LLM communication channels
let (from_llm_tx, from_llm_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let (input_tx, input_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
// Create tool execution channels
let (tool_result_tx, tool_result_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let (batch_result_tx, batch_result_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
// Create user interaction registry
let (user_interaction_tx, user_interaction_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let user_interaction_registry = Arc::new(
UserInteractionRegistry::new(user_interaction_tx)
);
// Create permission registry
let (permission_tx, permission_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
let permission_registry = Arc::new(
PermissionRegistry::new(permission_tx)
);
// Create tool infrastructure
let tool_registry = Arc::new(ToolRegistry::new());
let tool_executor = ToolExecutor::new(
tool_registry.clone(),
tool_result_tx,
batch_result_tx,
);
Self {
session_mgr: LLMSessionManager::new(),
token_usage: TokenUsageTracker::new(),
from_llm_rx: Mutex::new(from_llm_rx),
from_llm_tx,
input_rx: Mutex::new(input_rx),
input_tx,
tool_result_rx: Mutex::new(tool_result_rx),
batch_result_rx: Mutex::new(batch_result_rx),
user_interaction_rx: Mutex::new(user_interaction_rx),
permission_rx: Mutex::new(permission_rx),
tool_registry,
user_interaction_registry,
permission_registry,
tool_executor,
started: AtomicBool::new(false),
shutdown: AtomicBool::new(false),
cancel_token: CancellationToken::new(),
event_func,
}
}
}
The Main Event Loop
The start() method runs the 6-channel select loop:
pub async fn start(&self) {
// Ensure idempotent startup
if self.started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
tracing::warn!("Controller already started");
return;
}
tracing::info!("Controller starting");
loop {
// Acquire all receiver locks
let mut from_llm_guard = self.from_llm_rx.lock().await;
let mut input_guard = self.input_rx.lock().await;
let mut batch_result_guard = self.batch_result_rx.lock().await;
let mut tool_result_guard = self.tool_result_rx.lock().await;
let mut user_interaction_guard = self.user_interaction_rx.lock().await;
let mut permission_guard = self.permission_rx.lock().await;
tokio::select! {
// Cancellation check
_ = self.cancel_token.cancelled() => {
tracing::info!("Controller cancelled");
break;
}
// LLM responses
msg = from_llm_guard.recv() => {
drop_all_guards!(...);
if let Some(payload) = msg {
self.handle_llm_response(payload).await;
} else {
break;
}
}
// User input
msg = input_guard.recv() => {
drop_all_guards!(...);
if let Some(payload) = msg {
self.handle_input(payload).await;
} else {
break;
}
}
// Batch tool results
batch = batch_result_guard.recv() => {
drop_all_guards!(...);
if let Some(result) = batch {
self.handle_tool_batch_result(result).await;
}
}
// Individual tool results (for UI feedback)
result = tool_result_guard.recv() => {
drop_all_guards!(...);
if let Some(result) = result {
self.emit_tool_result(result);
}
}
// User interaction events
event = user_interaction_guard.recv() => {
drop_all_guards!(...);
if let Some(event) = event {
self.forward_event(event);
}
}
// Permission events
event = permission_guard.recv() => {
drop_all_guards!(...);
if let Some(event) = event {
self.forward_event(event);
}
}
}
}
tracing::info!("Controller stopped");
}
Session Management
LLMSessionManager
The session manager tracks active sessions:
pub struct LLMSessionManager {
sessions: RwLock<HashMap<i64, Arc<LLMSession>>>,
}
impl LLMSessionManager {
pub async fn create_session(
&self,
config: LLMSessionConfig,
from_llm_tx: mpsc::Sender<FromLLMPayload>,
) -> Result<i64, LlmError> {
let session = Arc::new(LLMSession::new(config, from_llm_tx)?);
let session_id = session.id();
// Store session
self.sessions.write().await.insert(session_id, session.clone());
// Start session task
tokio::spawn(async move {
session.start().await;
});
Ok(session_id)
}
pub async fn get_session_by_id(&self, id: i64) -> Option<Arc<LLMSession>> {
self.sessions.read().await.get(&id).cloned()
}
}
Creating Sessions
impl LLMController {
pub async fn create_session(
&self,
config: LLMSessionConfig,
tools: &[LLMTool],
) -> Result<i64, LlmError> {
let session_id = self.session_mgr
.create_session(config, self.from_llm_tx.clone())
.await?;
// Add tools to session
if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
session.set_tools(tools.to_vec()).await;
}
Ok(session_id)
}
}
Token Usage Tracking
pub struct TokenUsageTracker {
usage: RwLock<HashMap<i64, SessionUsage>>,
}
struct SessionUsage {
model: String,
input_tokens: i64,
output_tokens: i64,
}
impl TokenUsageTracker {
pub async fn increment(
&self,
session_id: i64,
model: &str,
input: i64,
output: i64,
) {
let mut usage = self.usage.write().await;
let entry = usage.entry(session_id).or_insert(SessionUsage {
model: model.to_string(),
input_tokens: 0,
output_tokens: 0,
});
entry.input_tokens += input;
entry.output_tokens += output;
}
pub async fn get(&self, session_id: i64) -> Option<(i64, i64)> {
self.usage.read().await.get(&session_id)
.map(|u| (u.input_tokens, u.output_tokens))
}
}
Sending Input
External code sends input via send_input():
pub async fn send_input(&self, payload: ControllerInputPayload) -> Result<(), ControllerError> {
if self.is_shutdown() {
return Err(ControllerError::Shutdown);
}
// 5-second timeout
tokio::time::timeout(
Duration::from_secs(5),
self.input_tx.send(payload)
)
.await
.map_err(|_| ControllerError::SendTimeout(5))?
.map_err(|_| ControllerError::ChannelClosed)
}
Event Emission
Events are emitted through the callback function:
fn emit_event(&self, event: ControllerEvent) {
if let Some(ref func) = self.event_func {
func(event);
}
}
fn emit_error(&self, session_id: i64, error: &str, turn_id: Option<TurnId>) {
self.emit_event(ControllerEvent::Error {
session_id,
error: error.to_string(),
turn_id,
});
}
Registry Access
impl LLMController {
pub fn tool_registry(&self) -> Arc<ToolRegistry> {
self.tool_registry.clone()
}
pub fn user_interaction_registry(&self) -> Arc<UserInteractionRegistry> {
self.user_interaction_registry.clone()
}
pub fn permission_registry(&self) -> Arc<PermissionRegistry> {
self.permission_registry.clone()
}
}
Shutdown
pub async fn shutdown(&self) {
if self.shutdown.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
return; // Already shutting down
}
tracing::info!("Controller shutting down");
self.cancel_token.cancel();
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::SeqCst)
}
State Flags
Atomic booleans track controller state:
started: AtomicBool, // Has start() been called?
shutdown: AtomicBool, // Has shutdown() been called?
These ensure:
start()is idempotent (only runs once)shutdown()is idempotent (only runs once)send_input()fails after shutdown
Thread Safety
The controller is designed for concurrent access:
| Component | Synchronization |
|---|---|
| Receivers | Mutex (required for select!) |
| Sessions | RwLock (read-heavy) |
| Token usage | RwLock (read-heavy) |
| State flags | AtomicBool (lock-free) |
| Registries | Arc + internal locking |
Event Types
The callback receives events defined in ControllerEvent:
pub type EventFunc = Box<dyn Fn(ControllerEvent) + Send + Sync>;
See Controller Events for all event variants.
Next Steps
- Input Processing - How input is handled
- Response Handling - How responses are processed
- Message Handling - The 6-channel select pattern
- Controller Events - Event definitions
