Streaming
This page documents how streaming responses from LLM APIs are processed. Streaming enables real-time display of generated text as it arrives from the provider.
Streaming Overview
┌─────────────────────────────────────────────────────────────────┐
│ LLM API │
│ Server-Sent Events (SSE) │
└─────────────────────────────────┬───────────────────────────────┘
│ HTTP Response Stream
▼
┌─────────────────────────────────────────────────────────────────┐
│ HttpClient │
│ post_stream() -> Stream<Bytes> │
└─────────────────────────────────┬───────────────────────────────┘
│ Byte chunks
▼
┌─────────────────────────────────────────────────────────────────┐
│ SSE Parser │
│ parse_sse_chunk() -> Vec<SseEvent> │
└─────────────────────────────────┬───────────────────────────────┘
│ Parsed events
▼
┌─────────────────────────────────────────────────────────────────┐
│ Event Parser │
│ parse_stream_event() -> StreamEvent │
└─────────────────────────────────┬───────────────────────────────┘
│ Typed events
▼
┌─────────────────────────────────────────────────────────────────┐
│ Application │
│ Process StreamEvent variants │
└─────────────────────────────────────────────────────────────────┘
Streaming Request
Streaming is enabled by adding "stream": true to the request:
pub fn build_streaming_request_body(
messages: &[Message],
options: &MessageOptions,
default_model: &str,
) -> Result<String, LlmError> {
let mut body = build_request_body(messages, options, default_model)?;
// Insert stream:true before the closing brace
body.pop(); // Remove trailing }
body.push_str(r#","stream":true}"#);
Ok(body)
}
Provider Streaming Implementation
The provider returns a stream of events:
fn send_msg_stream(
&self,
client: &HttpClient,
messages: &[Message],
options: &MessageOptions,
) -> Pin<Box<dyn Future<Output = Result<
Pin<Box<dyn Stream<Item = Result<StreamEvent, LlmError>> + Send>>,
LlmError
>> + Send>> {
Box::pin(async move {
// Build streaming request
let body = types::build_streaming_request_body(&messages, &options, &model)?;
let headers = types::get_request_headers(&api_key);
// Get byte stream from HTTP client
let byte_stream = client
.post_stream(types::get_api_url(), &headers, &body)
.await?;
// Convert to event stream
let event_stream = stream! {
let mut buffer = String::new();
let mut byte_stream = byte_stream;
while let Some(chunk_result) = byte_stream.next().await {
match chunk_result {
Ok(bytes) => {
// Append to buffer
if let Ok(text) = std::str::from_utf8(&bytes) {
buffer.push_str(text);
}
// Parse complete events
let (events, remaining) = sse::parse_sse_chunk(&buffer);
buffer = remaining;
// Yield each event
for sse_event in events {
match sse::parse_stream_event(&sse_event) {
Ok(Some(stream_event)) => yield Ok(stream_event),
Ok(None) => {} // Skip unknown
Err(e) => yield Err(e),
}
}
}
Err(e) => {
yield Err(e);
break;
}
}
}
};
Ok(Box::pin(event_stream))
})
}
SSE Format
Server-Sent Events use a text-based format:
event: message_start
data: {"type":"message_start","message":{...}}
event: content_block_start
data: {"type":"content_block_start","index":0,...}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: message_stop
data: {}
SSE Event Structure
#[derive(Debug)]
pub struct SseEvent {
pub event: String, // Event type
pub data: String, // JSON payload
}
SSE Parsing
The parser handles chunked input with buffering:
pub fn parse_sse_chunk(buffer: &str) -> (Vec<SseEvent>, String) {
let mut events = Vec::new();
let mut current_event = String::new();
let mut current_data = String::new();
let mut remaining = String::new();
let lines: Vec<&str> = buffer.split('\n').collect();
for line in lines {
if line.is_empty() {
// Empty line marks end of event
if !current_event.is_empty() || !current_data.is_empty() {
events.push(SseEvent {
event: current_event.clone(),
data: current_data.clone(),
});
current_event.clear();
current_data.clear();
}
} else if let Some(event_type) = line.strip_prefix("event: ") {
current_event = event_type.to_string();
} else if let Some(data) = line.strip_prefix("data: ") {
current_data = data.to_string();
}
}
// Keep incomplete event in remaining buffer
if !current_event.is_empty() || !current_data.is_empty() {
if !current_event.is_empty() {
remaining.push_str("event: ");
remaining.push_str(¤t_event);
remaining.push('\n');
}
if !current_data.is_empty() {
remaining.push_str("data: ");
remaining.push_str(¤t_data);
}
}
(events, remaining)
}
StreamEvent Types
Parsed events are converted to typed variants:
pub enum StreamEvent {
MessageStart {
message_id: String,
model: String,
},
ContentBlockStart {
index: usize,
block_type: ContentBlockType,
},
TextDelta {
index: usize,
text: String,
},
InputJsonDelta {
index: usize,
json: String,
},
ContentBlockStop {
index: usize,
},
MessageDelta {
stop_reason: Option<String>,
usage: Option<Usage>,
},
MessageStop,
Ping,
}
ContentBlockType
pub enum ContentBlockType {
Text,
ToolUse { id: String, name: String },
}
Usage
pub struct Usage {
pub input_tokens: u32,
pub output_tokens: u32,
}
Event Parsing
SSE events are parsed based on their type:
pub fn parse_stream_event(sse: &SseEvent) -> Result<Option<StreamEvent>, LlmError> {
match sse.event.as_str() {
"message_start" => parse_message_start(&sse.data),
"content_block_start" => parse_content_block_start(&sse.data),
"content_block_delta" => parse_content_block_delta(&sse.data),
"content_block_stop" => parse_content_block_stop(&sse.data),
"message_delta" => parse_message_delta(&sse.data),
"message_stop" => Ok(Some(StreamEvent::MessageStop)),
"ping" => Ok(Some(StreamEvent::Ping)),
"error" => parse_error(&sse.data),
_ => Ok(None), // Unknown event type
}
}
Event Flow
A typical streaming response:
1. message_start - Message metadata (id, model)
2. content_block_start - New content block starting (text or tool_use)
3. content_block_delta - Incremental content (repeated)
4. content_block_stop - Content block complete
5. message_delta - Stop reason and usage stats
6. message_stop - Stream complete
Text Response Flow
MessageStart { id: "msg_001", model: "claude-3-sonnet" }
ContentBlockStart { index: 0, block_type: Text }
TextDelta { index: 0, text: "Hello" }
TextDelta { index: 0, text: " world" }
TextDelta { index: 0, text: "!" }
ContentBlockStop { index: 0 }
MessageDelta { stop_reason: Some("end_turn"), usage: Some(...) }
MessageStop
Tool Use Flow
MessageStart { id: "msg_002", model: "claude-3-sonnet" }
ContentBlockStart { index: 0, block_type: ToolUse { id: "call_001", name: "get_weather" } }
InputJsonDelta { index: 0, json: "{\"" }
InputJsonDelta { index: 0, json: "city" }
InputJsonDelta { index: 0, json: "\":\"" }
InputJsonDelta { index: 0, json: "NYC\"}" }
ContentBlockStop { index: 0 }
MessageDelta { stop_reason: Some("tool_use"), usage: Some(...) }
MessageStop
Processing Streams
Application code processes events as they arrive:
let mut stream = client.send_message_stream(&messages, &options).await?;
let mut full_text = String::new();
let mut tool_json = String::new();
while let Some(event) = stream.next().await {
match event? {
StreamEvent::TextDelta { text, .. } => {
print!("{}", text);
full_text.push_str(&text);
}
StreamEvent::InputJsonDelta { json, .. } => {
tool_json.push_str(&json);
}
StreamEvent::ContentBlockStop { .. } => {
if !tool_json.is_empty() {
// Parse complete tool call
let args: serde_json::Value = serde_json::from_str(&tool_json)?;
// Execute tool...
tool_json.clear();
}
}
StreamEvent::MessageDelta { usage, stop_reason } => {
if let Some(usage) = usage {
println!("\nTokens used: {}", usage.output_tokens);
}
}
StreamEvent::MessageStop => break,
_ => {}
}
}
Buffer Management
Chunked data requires careful buffering:
let mut buffer = String::new();
while let Some(chunk) = byte_stream.next().await {
// Append new data
buffer.push_str(&String::from_utf8_lossy(&chunk?));
// Parse complete events, keep remainder
let (events, remaining) = parse_sse_chunk(&buffer);
buffer = remaining;
for event in events {
// Process event
}
}
This handles:
- Events split across chunks
- Multiple events in a single chunk
- Partial JSON data
Error Handling
Stream errors are yielded as results:
while let Some(result) = stream.next().await {
match result {
Ok(event) => { /* process */ }
Err(e) => {
tracing::error!("Stream error: {}", e);
break;
}
}
}
Error Event
The API can send error events mid-stream:
"error" => {
let parsed: serde_json::Value = serde_json::from_str(&sse.data)?;
let error_type = parsed["error"]["type"].as_str().unwrap_or("unknown");
let message = parsed["error"]["message"].as_str().unwrap_or("Unknown error");
Err(LlmError::new(error_type, message))
}
Ping Events
Keep-alive pings maintain the connection:
StreamEvent::Ping => {
// Connection is alive, no action needed
}
Next Steps
- Retry Logic - Handling rate limits
- Provider Abstraction - Provider-specific streaming
- Client Overview - LLMClient structure
