Input Processing

This page documents how user input flows from the TUI through the InputRouter to the LLMController, and how different input types are processed.

Input Flow Overview

┌─────────────────┐    ControllerInputPayload    ┌─────────────────┐
│      TUI        │─────────────────────────────▶│  InputRouter    │
│                 │   to_controller_tx.send()    │                 │
└─────────────────┘                              └────────┬────────┘

                                                          │ controller.send_input()

                                                 ┌─────────────────┐
                                                 │  LLMController  │
                                                 │                 │
                                                 │  handle_input() │
                                                 └────────┬────────┘

                                       ┌──────────────────┼──────────────────┐
                                       │                                     │
                                       ▼                                     ▼
                              ┌─────────────────┐                   ┌─────────────────┐
                              │ handle_data_    │                   │ handle_control_ │
                              │ input()         │                   │ input()         │
                              └─────────────────┘                   └─────────────────┘

ControllerInputPayload

All input is wrapped in ControllerInputPayload:

pub struct ControllerInputPayload {
    pub input_type: InputType,
    pub session_id: i64,
    pub content: String,
    pub control_cmd: Option<ControlCmd>,
    pub turn_id: Option<TurnId>,
}

InputType Enum

pub enum InputType {
    Data,     // User message to send to LLM
    Control,  // Control command (interrupt, clear, etc.)
}

Constructors

impl ControllerInputPayload {
    /// Create a data input (user message)
    pub fn data(session_id: i64, content: impl Into<String>, turn_id: TurnId) -> Self {
        Self {
            input_type: InputType::Data,
            session_id,
            content: content.into(),
            control_cmd: None,
            turn_id: Some(turn_id),
        }
    }

    /// Create a control input (command)
    pub fn control(session_id: i64, cmd: ControlCmd) -> Self {
        Self {
            input_type: InputType::Control,
            session_id,
            content: String::new(),
            control_cmd: Some(cmd),
            turn_id: None,
        }
    }
}

InputRouter

The InputRouter bridges the TUI channel to the controller:

pub struct InputRouter {
    controller: Arc<LLMController>,
    from_tui: ToControllerRx,
    cancel_token: CancellationToken,
}

impl InputRouter {
    pub fn new(
        controller: Arc<LLMController>,
        from_tui: ToControllerRx,
        cancel_token: CancellationToken,
    ) -> Self {
        Self { controller, from_tui, cancel_token }
    }

    pub async fn run(mut self) {
        loop {
            tokio::select! {
                _ = self.cancel_token.cancelled() => {
                    tracing::info!("InputRouter cancelled");
                    break;
                }
                msg = self.from_tui.recv() => {
                    match msg {
                        Some(payload) => {
                            if let Err(e) = self.controller.send_input(payload).await {
                                tracing::error!("Failed to send input: {}", e);
                            }
                        }
                        None => {
                            tracing::info!("TUI channel closed");
                            break;
                        }
                    }
                }
            }
        }
    }
}

Why InputRouter?

The router provides:

  • Decoupling between TUI and controller
  • Cancellation support
  • Error handling for send failures
  • Clean shutdown when TUI channel closes

Controller Input Handling

Main Dispatcher

async fn handle_input(&self, payload: ControllerInputPayload) {
    match payload.input_type {
        InputType::Data => self.handle_data_input(payload).await,
        InputType::Control => self.handle_control_input(payload).await,
    }
}

Data Input Processing

Data input sends user messages to the LLM:

async fn handle_data_input(&self, payload: ControllerInputPayload) {
    let session_id = payload.session_id;
    let turn_id = payload.turn_id;

    // Step 1: Lookup session
    let Some(session) = self.session_mgr.get_session_by_id(session_id).await else {
        tracing::error!("Session {} not found", session_id);
        self.emit_error(session_id, "Session not found", turn_id);
        return;
    };

    // Step 2: Create LLM payload
    let llm_payload = ToLLMPayload {
        request_type: LLMRequestType::UserMessage,
        content: payload.content,
        tool_results: Vec::new(),
        options: None,
        turn_id,
        compact_summaries: HashMap::new(),
    };

    // Step 3: Send to session
    let sent = session.send(llm_payload).await;
    if !sent {
        tracing::error!("Failed to send to session {}", session_id);
        self.emit_error(session_id, "Failed to send message to LLM", turn_id);
    }
}

ToLLMPayload

Messages sent to LLM sessions:

pub struct ToLLMPayload {
    pub request_type: LLMRequestType,
    pub content: String,
    pub tool_results: Vec<ToolResultInfo>,
    pub options: Option<LLMRequestOptions>,
    pub turn_id: Option<TurnId>,
    pub compact_summaries: HashMap<String, String>,
}

LLMRequestType

pub enum LLMRequestType {
    UserMessage,  // New user message
    ToolResult,   // Tool execution results
}

Control Input Processing

Control input handles commands:

async fn handle_control_input(&self, payload: ControllerInputPayload) {
    let session_id = payload.session_id;
    let cmd = payload.control_cmd.unwrap();

    match cmd {
        ControlCmd::Interrupt => self.handle_interrupt(session_id).await,
        ControlCmd::Shutdown => self.handle_shutdown().await,
        ControlCmd::Clear => self.handle_clear(session_id).await,
        ControlCmd::Compact => self.handle_compact(session_id).await,
    }
}

ControlCmd Enum

pub enum ControlCmd {
    Interrupt,  // Cancel current streaming
    Shutdown,   // Shutdown controller
    Clear,      // Clear conversation history
    Compact,    // Trigger context compaction
}

Interrupt Handling

async fn handle_interrupt(&self, session_id: i64) {
    if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
        session.interrupt().await;
        tracing::info!("Interrupted session {}", session_id);

        self.emit_event(ControllerEvent::CommandComplete {
            session_id,
            command: ControlCmd::Interrupt,
            success: true,
            message: Some("Interrupted".to_string()),
        });
    } else {
        self.emit_event(ControllerEvent::CommandComplete {
            session_id,
            command: ControlCmd::Interrupt,
            success: false,
            message: Some("Session not found".to_string()),
        });
    }
}

Shutdown Handling

async fn handle_shutdown(&self) {
    tracing::info!("Shutdown requested");
    self.shutdown().await;
}

Clear Handling

async fn handle_clear(&self, session_id: i64) {
    if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
        session.clear_conversation().await;
        tracing::info!("Cleared session {}", session_id);

        self.emit_event(ControllerEvent::CommandComplete {
            session_id,
            command: ControlCmd::Clear,
            success: true,
            message: None,
        });
    } else {
        self.emit_event(ControllerEvent::CommandComplete {
            session_id,
            command: ControlCmd::Clear,
            success: false,
            message: Some("Session not found".to_string()),
        });
    }
}

Compact Handling

async fn handle_compact(&self, session_id: i64) {
    if let Some(session) = self.session_mgr.get_session_by_id(session_id).await {
        match session.force_compact().await {
            Ok(result) => {
                tracing::info!("Compacted session {}: {:?}", session_id, result);
                self.emit_event(ControllerEvent::CommandComplete {
                    session_id,
                    command: ControlCmd::Compact,
                    success: true,
                    message: Some(format!("Compacted {} messages", result.messages_removed)),
                });
            }
            Err(e) => {
                tracing::error!("Compaction failed: {}", e);
                self.emit_event(ControllerEvent::CommandComplete {
                    session_id,
                    command: ControlCmd::Compact,
                    success: false,
                    message: Some(e.to_string()),
                });
            }
        }
    }
}

TUI Input Creation

The TUI creates input payloads when the user submits messages:

// In App
fn submit_message(&mut self) {
    let content = self.input.take_content();
    if content.is_empty() {
        return;
    }

    // Increment turn counter
    let turn_number = self.turn_counter.next_number();
    let turn_id = TurnId::new_user_turn(turn_number);

    // Create payload
    let payload = ControllerInputPayload::data(
        self.current_session_id,
        content,
        turn_id,
    );

    // Send to controller
    if let Err(e) = self.to_controller_tx.try_send(payload) {
        self.show_error(&format!("Failed to send: {}", e));
    }
}

Control Commands from TUI

// In App
fn handle_interrupt(&mut self) {
    let payload = ControllerInputPayload::control(
        self.current_session_id,
        ControlCmd::Interrupt,
    );
    self.to_controller_tx.try_send(payload).ok();
}

fn handle_clear(&mut self) {
    let payload = ControllerInputPayload::control(
        self.current_session_id,
        ControlCmd::Clear,
    );
    self.to_controller_tx.try_send(payload).ok();
}

Turn ID Tracking

Turn IDs track conversation position:

pub struct TurnId {
    pub owner: String,  // "u" for user, "a" for assistant
    pub number: i64,
}

impl TurnId {
    pub fn new_user_turn(n: i64) -> Self {
        Self { owner: "u".to_string(), number: n }
    }

    pub fn new_assistant_turn(n: i64) -> Self {
        Self { owner: "a".to_string(), number: n }
    }
}

// Display: "u1", "a1", "u2", "a2", etc.

TurnCounter

pub struct TurnCounter {
    counter: AtomicI64,
}

impl TurnCounter {
    pub fn new() -> Self {
        Self { counter: AtomicI64::new(0) }
    }

    pub fn next_number(&self) -> i64 {
        self.counter.fetch_add(1, Ordering::SeqCst) + 1
    }
}

Error Handling

Input processing errors are emitted as events:

fn emit_error(&self, session_id: i64, error: &str, turn_id: Option<TurnId>) {
    tracing::error!("Input error for session {}: {}", session_id, error);

    self.emit_event(ControllerEvent::Error {
        session_id,
        error: error.to_string(),
        turn_id,
    });
}

Common errors:

  • Session not found
  • Failed to send to session
  • Channel closed

Next Steps