Input Processing
This page documents how user input flows from the TUI through the InputRouter to the LLMController, and how different input types are processed.
Input Flow Overview
┌─────────────────┐ ControllerInputPayload ┌─────────────────┐
│ TUI │─────────────────────────────▶│ InputRouter │
│ │ to_controller_tx.send() │ │
└─────────────────┘ └────────┬────────┘
│
│ controller.send_input()
▼
┌─────────────────┐
│ LLMController │
│ │
│ handle_input() │
└────────┬────────┘
│
┌──────────────────┼──────────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ handle_data_ │ │ handle_control_ │
│ input() │ │ input() │
└─────────────────┘ └─────────────────┘
ControllerInputPayload
All input is wrapped in ControllerInputPayload:
pub struct ControllerInputPayload {
pub input_type: InputType,
pub session_id: i64,
pub content: String,
pub control_cmd: Option<ControlCmd>,
pub turn_id: Option<TurnId>,
}
InputType Enum
pub enum InputType {
Data, // User message to send to LLM
Control, // Control command (interrupt, clear, etc.)
}
Constructors
impl ControllerInputPayload {
/// Create a data input (user message)
pub fn data(session_id: i64, content: impl Into<String>, turn_id: TurnId) -> Self {
Self {
input_type: InputType::Data,
session_id,
content: content.into(),
control_cmd: None,
turn_id: Some(turn_id),
}
}
/// Create a control input (command)
pub fn control(session_id: i64, cmd: ControlCmd) -> Self {
Self {
input_type: InputType::Control,
session_id,
content: String::new(),
control_cmd: Some(cmd),
turn_id: None,
}
}
}
InputRouter
The InputRouter bridges the TUI channel to the controller:
pub struct InputRouter {
controller: Arc<LLMController>,
from_tui: ToControllerRx,
cancel_token: CancellationToken,
}
impl InputRouter {
pub fn new(
controller: Arc<LLMController>,
from_tui: ToControllerRx,
cancel_token: CancellationToken,
) -> Self {
Self { controller, from_tui, cancel_token }
}
pub async fn run(mut self) {
loop {
tokio::select! {
_ = self.cancel_token.cancelled() => {
tracing::info!("InputRouter cancelled");
break;
}
msg = self.from_tui.recv() => {
match msg {
Some(payload) => {
if let Err(e) = self.controller.send_input(payload).await {
tracing::error!("Failed to send input: {}", e);
}
}
None => {
tracing::info!("TUI channel closed");
break;
}
}
}
}
}
}
}
Why InputRouter?
The router provides:
- Decoupling between TUI and controller
- Cancellation support
- Error handling for send failures
- Clean shutdown when TUI channel closes
Controller Input Handling
Main Dispatcher
async fn handle_input(&self, payload: ControllerInputPayload) {
match payload.input_type {
InputType::Data => self.handle_data_input(payload).await,
InputType::Control => self.handle_control_input(payload).await,
}
}
Data Input Processing
Data input sends user messages to the LLM:
async fn handle_data_input(&self, payload: ControllerInputPayload) {
let session_id = payload.session_id;
let turn_id = payload.turn_id;
// Step 1: Lookup session
let Some(session) = self.session_mgr.get_session_by_id(session_id).await else {
tracing::error!("Session {} not found", session_id);
self.emit_error(session_id, "Session not found", turn_id);
return;
};
// Step 2: Create LLM payload
let llm_payload = ToLLMPayload {
request_type: LLMRequestType::UserMessage,
content: payload.content,
tool_results: Vec::new(),
options: None,
turn_id,
compact_summaries: HashMap::new(),
};
// Step 3: Send to session
let sent = session.send(llm_payload).await;
if !sent {
tracing::error!("Failed to send to session {}", session_id);
self.emit_error(session_id, "Failed to send message to LLM", turn_id);
}
}
ToLLMPayload
Messages sent to LLM sessions:
pub struct ToLLMPayload {
pub request_type: LLMRequestType,
pub content: String,
pub tool_results: Vec<ToolResultInfo>,
pub options: Option<LLMRequestOptions>,
pub turn_id: Option<TurnId>,
pub compact_summaries: HashMap<String, String>,
}
LLMRequestType
pub enum LLMRequestType {
UserMessage, // New user message
ToolResult, // Tool execution results
}
Control Input Processing
Control input handles commands:
async fn handle_control_input(&self, payload: ControllerInputPayload) {
let session_id = payload.session_id;
let cmd = payload.control_cmd.unwrap();
match cmd {
ControlCmd::Interrupt => self.handle_interrupt(session_id).await,
ControlCmd::Shutdown => self.handle_shutdown().await,
ControlCmd::Clear => self.handle_clear(session_id).await,
ControlCmd::Compact => self.handle_compact(session_id).await,
}
}
ControlCmd Enum
pub enum ControlCmd {
Interrupt, // Cancel current streaming
Shutdown, // Shutdown controller
Clear, // Clear conversation history
Compact, // Trigger context compaction
}
Interrupt Handling
async fn handle_interrupt(&self, session_id: i64) {
if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
session.interrupt().await;
tracing::info!("Interrupted session {}", session_id);
self.emit_event(ControllerEvent::CommandComplete {
session_id,
command: ControlCmd::Interrupt,
success: true,
message: Some("Interrupted".to_string()),
});
} else {
self.emit_event(ControllerEvent::CommandComplete {
session_id,
command: ControlCmd::Interrupt,
success: false,
message: Some("Session not found".to_string()),
});
}
}
Shutdown Handling
async fn handle_shutdown(&self) {
tracing::info!("Shutdown requested");
self.shutdown().await;
}
Clear Handling
async fn handle_clear(&self, session_id: i64) {
if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
session.clear_conversation().await;
tracing::info!("Cleared session {}", session_id);
self.emit_event(ControllerEvent::CommandComplete {
session_id,
command: ControlCmd::Clear,
success: true,
message: None,
});
} else {
self.emit_event(ControllerEvent::CommandComplete {
session_id,
command: ControlCmd::Clear,
success: false,
message: Some("Session not found".to_string()),
});
}
}
Compact Handling
async fn handle_compact(&self, session_id: i64) {
if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
match session.force_compact().await {
Ok(result) => {
tracing::info!("Compacted session {}: {:?}", session_id, result);
self.emit_event(ControllerEvent::CommandComplete {
session_id,
command: ControlCmd::Compact,
success: true,
message: Some(format!("Compacted {} messages", result.messages_removed)),
});
}
Err(e) => {
tracing::error!("Compaction failed: {}", e);
self.emit_event(ControllerEvent::CommandComplete {
session_id,
command: ControlCmd::Compact,
success: false,
message: Some(e.to_string()),
});
}
}
}
}
TUI Input Creation
The TUI creates input payloads when the user submits messages:
// In App
fn submit_message(&mut self) {
let content = self.input.take_content();
if content.is_empty() {
return;
}
// Increment turn counter
let turn_number = self.turn_counter.next_number();
let turn_id = TurnId::new_user_turn(turn_number);
// Create payload
let payload = ControllerInputPayload::data(
self.current_session_id,
content,
turn_id,
);
// Send to controller
if let Err(e) = self.to_controller_tx.try_send(payload) {
self.show_error(&format!("Failed to send: {}", e));
}
}
Control Commands from TUI
// In App
fn handle_interrupt(&mut self) {
let payload = ControllerInputPayload::control(
self.current_session_id,
ControlCmd::Interrupt,
);
self.to_controller_tx.try_send(payload).ok();
}
fn handle_clear(&mut self) {
let payload = ControllerInputPayload::control(
self.current_session_id,
ControlCmd::Clear,
);
self.to_controller_tx.try_send(payload).ok();
}
Turn ID Tracking
Turn IDs track conversation position:
pub struct TurnId {
pub owner: String, // "u" for user, "a" for assistant
pub number: i64,
}
impl TurnId {
pub fn new_user_turn(n: i64) -> Self {
Self { owner: "u".to_string(), number: n }
}
pub fn new_assistant_turn(n: i64) -> Self {
Self { owner: "a".to_string(), number: n }
}
}
// Display: "u1", "a1", "u2", "a2", etc.
TurnCounter
pub struct TurnCounter {
counter: AtomicI64,
}
impl TurnCounter {
pub fn new() -> Self {
Self { counter: AtomicI64::new(0) }
}
pub fn next_number(&self) -> i64 {
self.counter.fetch_add(1, Ordering::SeqCst) + 1
}
}
Error Handling
Input processing errors are emitted as events:
fn emit_error(&self, session_id: i64, error: &str, turn_id: Option<TurnId>) {
tracing::error!("Input error for session {}: {}", session_id, error);
self.emit_event(ControllerEvent::Error {
session_id,
error: error.to_string(),
turn_id,
});
}
Common errors:
- Session not found
- Failed to send to session
- Channel closed
Next Steps
- Response Handling - How LLM responses are processed
- LLMController - Controller overview
- Message Flow - Complete message routing
