Session Lifecycle
This page documents the internal lifecycle of an LLMSession, from creation through active operation to shutdown. Understanding the session lifecycle is essential for managing conversations and debugging session-related issues.
Lifecycle Overview
┌─────────────────────────────────────────────────────────────────┐
│ INITIALIZATION PHASE │
│ LLMSession::new() │
│ - Generate unique session ID │
│ - Create LLMClient for provider │
│ - Initialize channels and state │
│ - Configure compactor if enabled │
└─────────────────────────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ STARTING PHASE │
│ session.start() spawned as tokio task │
│ - Enter main processing loop │
│ - Listen on to_llm_rx channel │
└─────────────────────────────────────────┬───────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ RUNNING PHASE │
│ Processing requests in loop │
│ - Receive ToLLMPayload messages │
│ - Handle streaming or non-streaming requests │
│ - Manage conversation history │
│ - Trigger compaction when needed │
└─────────────────────────────────────────┬───────────────────────┘
│ cancel_token.cancel()
▼
┌─────────────────────────────────────────────────────────────────┐
│ SHUTDOWN PHASE │
│ session.shutdown() │
│ - Set shutdown flag │
│ - Cancel cancellation token │
│ - Exit processing loop │
└─────────────────────────────────────────────────────────────────┘
Session States
Sessions have implicit states tracked by internal flags:
pub struct LLMSession {
// State tracking
shutdown: AtomicBool,
cancel_token: CancellationToken,
current_cancel: Mutex<Option<CancellationToken>>,
// ...
}
| State | Condition | Description |
|---|---|---|
| Initializing | After new(), before start() | Session created but not processing |
| Running | start() loop active | Accepting and processing messages |
| Processing | current_cancel is Some | Actively handling an LLM request |
| Idle | Running, current_cancel is None | Waiting for next message |
| Shutting Down | shutdown = true | Cancel token triggered |
| Stopped | Loop exited | No longer processing messages |
Phase 1: Initialization
Session initialization occurs in LLMSession::new():
pub fn new(
config: LLMSessionConfig,
from_llm: mpsc::Sender<FromLLMPayload>,
) -> Result<Self, LlmError> {
// Generate unique session ID
static SESSION_COUNTER: AtomicI64 = AtomicI64::new(0);
let id = SESSION_COUNTER.fetch_add(1, Ordering::SeqCst);
// Create LLM client based on provider
let client = match config.provider {
LLMProvider::Anthropic => LLMClient::anthropic(&config.api_key)?,
LLMProvider::OpenAI => LLMClient::openai(&config.api_key)?,
};
// Create internal communication channels
let (to_llm_tx, to_llm_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
// Initialize compactor if configured
let compactor = match &config.compaction {
Some(CompactorType::Threshold(cfg)) => {
Some(Box::new(ThresholdCompactor::new(cfg.clone())) as Box<dyn Compactor>)
}
Some(CompactorType::LLM(cfg)) => None, // LLM compactor handled separately
None => None,
};
let llm_compactor = match &config.compaction {
Some(CompactorType::LLM(cfg)) => {
Some(LLMCompactor::new(client.clone(), cfg.clone())?)
}
_ => None,
};
Ok(Self {
id: AtomicI64::new(id),
client,
to_llm_tx,
to_llm_rx: Mutex::new(to_llm_rx),
from_llm,
config,
system_prompt: RwLock::new(None),
max_tokens: AtomicI64::new(0),
created_at: Instant::now(),
conversation: RwLock::new(Arc::new(Vec::new())),
shutdown: AtomicBool::new(false),
cancel_token: CancellationToken::new(),
current_cancel: Mutex::new(None),
current_turn_id: RwLock::new(None),
current_input_tokens: AtomicI64::new(0),
current_output_tokens: AtomicI64::new(0),
request_count: AtomicI64::new(0),
tool_definitions: RwLock::new(Vec::new()),
compactor,
llm_compactor,
context_limit: AtomicI32::new(config.context_limit),
compact_summaries: RwLock::new(HashMap::new()),
})
}
What Happens at Initialization
| Step | Action | Result |
|---|---|---|
| 1 | Generate ID | Unique i64 from global counter |
| 2 | Create client | LLMClient for Anthropic or OpenAI |
| 3 | Create channels | to_llm for receiving requests |
| 4 | Init compactor | ThresholdCompactor or LLMCompactor |
| 5 | Init state | Atomic flags, RwLocks, counters |
Phase 2: Starting
The session is started by LLMSessionManager:
// In LLMSessionManager::create_session()
let session = Arc::new(LLMSession::new(config, from_llm_tx)?);
let session_id = session.id();
// Store in sessions map
self.sessions.write().await.insert(session_id, session.clone());
// Spawn processing task
tokio::spawn(async move {
session.start().await;
});
The start() method enters the main processing loop:
pub async fn start(&self) {
tracing::info!("Session {} starting", self.id());
loop {
let mut rx_guard = self.to_llm_rx.lock().await;
tokio::select! {
_ = self.cancel_token.cancelled() => {
tracing::info!("Session {} cancelled", self.id());
break;
}
msg = rx_guard.recv() => {
drop(rx_guard); // Release lock before processing
match msg {
Some(payload) => {
self.handle_request(payload).await;
}
None => {
tracing::info!("Session {} channel closed", self.id());
break;
}
}
}
}
}
tracing::info!("Session {} stopped", self.id());
}
Phase 3: Running
During the running phase, the session processes requests:
Request Handling
async fn handle_request(&self, request: ToLLMPayload) {
match request.request_type {
LLMRequestType::UserMessage | LLMRequestType::ToolResult => {
if self.config.streaming {
self.handle_streaming_request(request).await;
} else {
self.handle_non_streaming_request(request).await;
}
}
}
}
Request Preparation
Before each LLM call:
async fn prepare_request(&self, turn_id: Option<TurnId>) {
// Create per-request cancellation token
let request_cancel = CancellationToken::new();
*self.current_cancel.lock().await = Some(request_cancel.clone());
// Store current turn ID for interrupt filtering
*self.current_turn_id.write().await = turn_id;
}
Processing Cycle
1. Receive ToLLMPayload from to_llm_rx
2. Call prepare_request() to setup cancellation
3. Check and trigger compaction if needed
4. Build conversation messages
5. Call LLM API (streaming or non-streaming)
6. Process response, emit FromLLMPayload events
7. Update conversation history
8. Increment request_count
9. Clear current_cancel and current_turn_id
10. Wait for next message
Phase 4: Shutdown
Shutdown can be triggered by:
- Controller shutdown
- Session manager removal
- Channel closure
pub async fn shutdown(&self) {
tracing::info!("Session {} shutting down", self.id());
// Set shutdown flag
self.shutdown.store(true, Ordering::SeqCst);
// Cancel the main loop
self.cancel_token.cancel();
}
The processing loop exits when:
cancel_token.cancelled()completes- Channel returns
None(sender dropped)
Interrupt Handling
Sessions support interrupting the current request:
pub async fn interrupt(&self) {
// Cancel current request
if let Some(cancel) = self.current_cancel.lock().await.take() {
cancel.cancel();
}
// Remove messages from current turn
let turn_id = self.current_turn_id.read().await.clone();
if let Some(turn_id) = turn_id {
let mut guard = self.conversation.write().await;
Arc::make_mut(&mut *guard).retain(|msg| msg.turn_id() != &turn_id);
}
// Clear turn tracking
*self.current_turn_id.write().await = None;
}
Interrupt:
- Cancels the per-request token (stops streaming)
- Removes all messages with the current turn ID
- Clears the current turn tracking
Conversation Operations
Clear Conversation
pub async fn clear_conversation(&self) {
// Clear message history
*self.conversation.write().await = Arc::new(Vec::new());
// Reset token counters
self.current_input_tokens.store(0, Ordering::SeqCst);
self.current_output_tokens.store(0, Ordering::SeqCst);
// Clear compact summaries
self.compact_summaries.write().await.clear();
}
Force Compaction
pub async fn force_compact(&self) -> Result<CompactionResult, CompactionError> {
// Trigger compaction regardless of threshold
self.compact_now().await
}
Session Accessors
Read-only access to session state:
pub fn id(&self) -> i64
pub fn model(&self) -> &str
pub fn created_at(&self) -> Instant
pub fn context_limit(&self) -> i32
pub fn is_shutdown(&self) -> bool
pub async fn conversation_len(&self) -> usize
pub async fn system_prompt(&self) -> Option<String>
pub async fn tools(&self) -> Vec<LLMTool>
Session Mutators
Modify session configuration:
pub fn set_max_tokens(&self, tokens: i64)
pub async fn set_system_prompt(&self, prompt: impl Into<String>)
pub async fn clear_system_prompt(&self)
pub async fn set_tools(&self, tools: Vec<LLMTool>)
pub async fn clear_tools(&self)
Thread Safety
Session state uses appropriate synchronization:
| Field | Type | Purpose |
|---|---|---|
id | AtomicI64 | Unique identifier |
shutdown | AtomicBool | Shutdown flag |
max_tokens | AtomicI64 | Runtime override |
context_limit | AtomicI32 | Context window |
request_count | AtomicI64 | Request counter |
conversation | RwLock<Arc<Vec>> | Message history |
system_prompt | RwLock<Option> | Runtime prompt |
tool_definitions | RwLock<Vec> | Tool schemas |
current_cancel | Mutex<Option> | Request token |
current_turn_id | RwLock<Option> | Turn tracking |
Next Steps
- Session Creation - How sessions are created
- Status Tracking - Session status monitoring
- Context Management - Conversation state
