Session Lifecycle

This page documents the internal lifecycle of an LLMSession, from creation through active operation to shutdown. Understanding the session lifecycle is essential for managing conversations and debugging session-related issues.

Lifecycle Overview

┌─────────────────────────────────────────────────────────────────┐
│                    INITIALIZATION PHASE                          │
│  LLMSession::new()                                               │
│  - Generate unique session ID                                    │
│  - Create LLMClient for provider                                 │
│  - Initialize channels and state                                 │
│  - Configure compactor if enabled                                │
└─────────────────────────────────────────┬───────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                    STARTING PHASE                                │
│  session.start() spawned as tokio task                          │
│  - Enter main processing loop                                    │
│  - Listen on to_llm_rx channel                                   │
└─────────────────────────────────────────┬───────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                    RUNNING PHASE                                 │
│  Processing requests in loop                                     │
│  - Receive ToLLMPayload messages                                 │
│  - Handle streaming or non-streaming requests                    │
│  - Manage conversation history                                   │
│  - Trigger compaction when needed                                │
└─────────────────────────────────────────┬───────────────────────┘
                                          │ cancel_token.cancel()

┌─────────────────────────────────────────────────────────────────┐
│                    SHUTDOWN PHASE                                │
│  session.shutdown()                                              │
│  - Set shutdown flag                                             │
│  - Cancel cancellation token                                     │
│  - Exit processing loop                                          │
└─────────────────────────────────────────────────────────────────┘

Session States

Sessions have implicit states tracked by internal flags:

pub struct LLMSession {
    // State tracking
    shutdown: AtomicBool,
    cancel_token: CancellationToken,
    current_cancel: Mutex<Option<CancellationToken>>,
    // ...
}
StateConditionDescription
InitializingAfter new(), before start()Session created but not processing
Runningstart() loop activeAccepting and processing messages
Processingcurrent_cancel is SomeActively handling an LLM request
IdleRunning, current_cancel is NoneWaiting for next message
Shutting Downshutdown = trueCancel token triggered
StoppedLoop exitedNo longer processing messages

Phase 1: Initialization

Session initialization occurs in LLMSession::new():

pub fn new(
    config: LLMSessionConfig,
    from_llm: mpsc::Sender<FromLLMPayload>,
) -> Result<Self, LlmError> {
    // Generate unique session ID
    static SESSION_COUNTER: AtomicI64 = AtomicI64::new(0);
    let id = SESSION_COUNTER.fetch_add(1, Ordering::SeqCst);

    // Create LLM client based on provider
    let client = match config.provider {
        LLMProvider::Anthropic => LLMClient::anthropic(&config.api_key)?,
        LLMProvider::OpenAI => LLMClient::openai(&config.api_key)?,
    };

    // Create internal communication channels
    let (to_llm_tx, to_llm_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

    // Initialize compactor if configured
    let compactor = match &config.compaction {
        Some(CompactorType::Threshold(cfg)) => {
            Some(Box::new(ThresholdCompactor::new(cfg.clone())) as Box<dyn Compactor>)
        }
        Some(CompactorType::LLM(cfg)) => None, // LLM compactor handled separately
        None => None,
    };

    let llm_compactor = match &config.compaction {
        Some(CompactorType::LLM(cfg)) => {
            Some(LLMCompactor::new(client.clone(), cfg.clone())?)
        }
        _ => None,
    };

    Ok(Self {
        id: AtomicI64::new(id),
        client,
        to_llm_tx,
        to_llm_rx: Mutex::new(to_llm_rx),
        from_llm,
        config,
        system_prompt: RwLock::new(None),
        max_tokens: AtomicI64::new(0),
        created_at: Instant::now(),
        conversation: RwLock::new(Arc::new(Vec::new())),
        shutdown: AtomicBool::new(false),
        cancel_token: CancellationToken::new(),
        current_cancel: Mutex::new(None),
        current_turn_id: RwLock::new(None),
        current_input_tokens: AtomicI64::new(0),
        current_output_tokens: AtomicI64::new(0),
        request_count: AtomicI64::new(0),
        tool_definitions: RwLock::new(Vec::new()),
        compactor,
        llm_compactor,
        context_limit: AtomicI32::new(config.context_limit),
        compact_summaries: RwLock::new(HashMap::new()),
    })
}

What Happens at Initialization

StepActionResult
1Generate IDUnique i64 from global counter
2Create clientLLMClient for Anthropic or OpenAI
3Create channelsto_llm for receiving requests
4Init compactorThresholdCompactor or LLMCompactor
5Init stateAtomic flags, RwLocks, counters

Phase 2: Starting

The session is started by LLMSessionManager:

// In LLMSessionManager::create_session()
let session = Arc::new(LLMSession::new(config, from_llm_tx)?);
let session_id = session.id();

// Store in sessions map
self.sessions.write().await.insert(session_id, session.clone());

// Spawn processing task
tokio::spawn(async move {
    session.start().await;
});

The start() method enters the main processing loop:

pub async fn start(&self) {
    tracing::info!("Session {} starting", self.id());

    loop {
        let mut rx_guard = self.to_llm_rx.lock().await;

        tokio::select! {
            _ = self.cancel_token.cancelled() => {
                tracing::info!("Session {} cancelled", self.id());
                break;
            }
            msg = rx_guard.recv() => {
                drop(rx_guard);  // Release lock before processing
                match msg {
                    Some(payload) => {
                        self.handle_request(payload).await;
                    }
                    None => {
                        tracing::info!("Session {} channel closed", self.id());
                        break;
                    }
                }
            }
        }
    }

    tracing::info!("Session {} stopped", self.id());
}

Phase 3: Running

During the running phase, the session processes requests:

Request Handling

async fn handle_request(&self, request: ToLLMPayload) {
    match request.request_type {
        LLMRequestType::UserMessage | LLMRequestType::ToolResult => {
            if self.config.streaming {
                self.handle_streaming_request(request).await;
            } else {
                self.handle_non_streaming_request(request).await;
            }
        }
    }
}

Request Preparation

Before each LLM call:

async fn prepare_request(&self, turn_id: Option<TurnId>) {
    // Create per-request cancellation token
    let request_cancel = CancellationToken::new();
    *self.current_cancel.lock().await = Some(request_cancel.clone());

    // Store current turn ID for interrupt filtering
    *self.current_turn_id.write().await = turn_id;
}

Processing Cycle

1. Receive ToLLMPayload from to_llm_rx
2. Call prepare_request() to setup cancellation
3. Check and trigger compaction if needed
4. Build conversation messages
5. Call LLM API (streaming or non-streaming)
6. Process response, emit FromLLMPayload events
7. Update conversation history
8. Increment request_count
9. Clear current_cancel and current_turn_id
10. Wait for next message

Phase 4: Shutdown

Shutdown can be triggered by:

  • Controller shutdown
  • Session manager removal
  • Channel closure
pub async fn shutdown(&self) {
    tracing::info!("Session {} shutting down", self.id());

    // Set shutdown flag
    self.shutdown.store(true, Ordering::SeqCst);

    // Cancel the main loop
    self.cancel_token.cancel();
}

The processing loop exits when:

  • cancel_token.cancelled() completes
  • Channel returns None (sender dropped)

Interrupt Handling

Sessions support interrupting the current request:

pub async fn interrupt(&self) {
    // Cancel current request
    if let Some(cancel) = self.current_cancel.lock().await.take() {
        cancel.cancel();
    }

    // Remove messages from current turn
    let turn_id = self.current_turn_id.read().await.clone();
    if let Some(turn_id) = turn_id {
        let mut guard = self.conversation.write().await;
        Arc::make_mut(&mut *guard).retain(|msg| msg.turn_id() != &turn_id);
    }

    // Clear turn tracking
    *self.current_turn_id.write().await = None;
}

Interrupt:

  1. Cancels the per-request token (stops streaming)
  2. Removes all messages with the current turn ID
  3. Clears the current turn tracking

Conversation Operations

Clear Conversation

pub async fn clear_conversation(&self) {
    // Clear message history
    *self.conversation.write().await = Arc::new(Vec::new());

    // Reset token counters
    self.current_input_tokens.store(0, Ordering::SeqCst);
    self.current_output_tokens.store(0, Ordering::SeqCst);

    // Clear compact summaries
    self.compact_summaries.write().await.clear();
}

Force Compaction

pub async fn force_compact(&self) -> Result<CompactionResult, CompactionError> {
    // Trigger compaction regardless of threshold
    self.compact_now().await
}

Session Accessors

Read-only access to session state:

pub fn id(&self) -> i64
pub fn model(&self) -> &str
pub fn created_at(&self) -> Instant
pub fn context_limit(&self) -> i32
pub fn is_shutdown(&self) -> bool

pub async fn conversation_len(&self) -> usize
pub async fn system_prompt(&self) -> Option<String>
pub async fn tools(&self) -> Vec<LLMTool>

Session Mutators

Modify session configuration:

pub fn set_max_tokens(&self, tokens: i64)
pub async fn set_system_prompt(&self, prompt: impl Into<String>)
pub async fn clear_system_prompt(&self)
pub async fn set_tools(&self, tools: Vec<LLMTool>)
pub async fn clear_tools(&self)

Thread Safety

Session state uses appropriate synchronization:

FieldTypePurpose
idAtomicI64Unique identifier
shutdownAtomicBoolShutdown flag
max_tokensAtomicI64Runtime override
context_limitAtomicI32Context window
request_countAtomicI64Request counter
conversationRwLock<Arc<Vec>>Message history
system_promptRwLock<Option>Runtime prompt
tool_definitionsRwLock<Vec>Tool schemas
current_cancelMutex<Option>Request token
current_turn_idRwLock<Option>Turn tracking

Next Steps