Streaming

This page documents how streaming responses from LLM APIs are processed. Streaming enables real-time display of generated text as it arrives from the provider.

Streaming Overview

┌─────────────────────────────────────────────────────────────────┐
│                    LLM API                                       │
│  Server-Sent Events (SSE)                                       │
└─────────────────────────────────┬───────────────────────────────┘
                                  │ HTTP Response Stream

┌─────────────────────────────────────────────────────────────────┐
│                    HttpClient                                    │
│  post_stream() -> Stream<Bytes>                                 │
└─────────────────────────────────┬───────────────────────────────┘
                                  │ Byte chunks

┌─────────────────────────────────────────────────────────────────┐
│                    SSE Parser                                    │
│  parse_sse_chunk() -> Vec<SseEvent>                             │
└─────────────────────────────────┬───────────────────────────────┘
                                  │ Parsed events

┌─────────────────────────────────────────────────────────────────┐
│                    Event Parser                                  │
│  parse_stream_event() -> StreamEvent                            │
└─────────────────────────────────┬───────────────────────────────┘
                                  │ Typed events

┌─────────────────────────────────────────────────────────────────┐
│                    Application                                   │
│  Process StreamEvent variants                                    │
└─────────────────────────────────────────────────────────────────┘

Streaming Request

Streaming is enabled by adding "stream": true to the request:

pub fn build_streaming_request_body(
    messages: &[Message],
    options: &MessageOptions,
    default_model: &str,
) -> Result<String, LlmError> {
    let mut body = build_request_body(messages, options, default_model)?;
    // Insert stream:true before the closing brace
    body.pop(); // Remove trailing }
    body.push_str(r#","stream":true}"#);
    Ok(body)
}

Provider Streaming Implementation

The provider returns a stream of events:

fn send_msg_stream(
    &self,
    client: &HttpClient,
    messages: &[Message],
    options: &MessageOptions,
) -> Pin<Box<dyn Future<Output = Result<
    Pin<Box<dyn Stream<Item = Result<StreamEvent, LlmError>> + Send>>,
    LlmError
>> + Send>> {
    Box::pin(async move {
        // Build streaming request
        let body = types::build_streaming_request_body(&messages, &options, &model)?;
        let headers = types::get_request_headers(&api_key);

        // Get byte stream from HTTP client
        let byte_stream = client
            .post_stream(types::get_api_url(), &headers, &body)
            .await?;

        // Convert to event stream
        let event_stream = stream! {
            let mut buffer = String::new();
            let mut byte_stream = byte_stream;

            while let Some(chunk_result) = byte_stream.next().await {
                match chunk_result {
                    Ok(bytes) => {
                        // Append to buffer
                        if let Ok(text) = std::str::from_utf8(&bytes) {
                            buffer.push_str(text);
                        }

                        // Parse complete events
                        let (events, remaining) = sse::parse_sse_chunk(&buffer);
                        buffer = remaining;

                        // Yield each event
                        for sse_event in events {
                            match sse::parse_stream_event(&sse_event) {
                                Ok(Some(stream_event)) => yield Ok(stream_event),
                                Ok(None) => {} // Skip unknown
                                Err(e) => yield Err(e),
                            }
                        }
                    }
                    Err(e) => {
                        yield Err(e);
                        break;
                    }
                }
            }
        };

        Ok(Box::pin(event_stream))
    })
}

SSE Format

Server-Sent Events use a text-based format:

event: message_start
data: {"type":"message_start","message":{...}}

event: content_block_start
data: {"type":"content_block_start","index":0,...}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

event: message_stop
data: {}

SSE Event Structure

#[derive(Debug)]
pub struct SseEvent {
    pub event: String,  // Event type
    pub data: String,   // JSON payload
}

SSE Parsing

The parser handles chunked input with buffering:

pub fn parse_sse_chunk(buffer: &str) -> (Vec<SseEvent>, String) {
    let mut events = Vec::new();
    let mut current_event = String::new();
    let mut current_data = String::new();
    let mut remaining = String::new();

    let lines: Vec<&str> = buffer.split('\n').collect();

    for line in lines {
        if line.is_empty() {
            // Empty line marks end of event
            if !current_event.is_empty() || !current_data.is_empty() {
                events.push(SseEvent {
                    event: current_event.clone(),
                    data: current_data.clone(),
                });
                current_event.clear();
                current_data.clear();
            }
        } else if let Some(event_type) = line.strip_prefix("event: ") {
            current_event = event_type.to_string();
        } else if let Some(data) = line.strip_prefix("data: ") {
            current_data = data.to_string();
        }
    }

    // Keep incomplete event in remaining buffer
    if !current_event.is_empty() || !current_data.is_empty() {
        if !current_event.is_empty() {
            remaining.push_str("event: ");
            remaining.push_str(&current_event);
            remaining.push('\n');
        }
        if !current_data.is_empty() {
            remaining.push_str("data: ");
            remaining.push_str(&current_data);
        }
    }

    (events, remaining)
}

StreamEvent Types

Parsed events are converted to typed variants:

pub enum StreamEvent {
    MessageStart {
        message_id: String,
        model: String,
    },
    ContentBlockStart {
        index: usize,
        block_type: ContentBlockType,
    },
    TextDelta {
        index: usize,
        text: String,
    },
    InputJsonDelta {
        index: usize,
        json: String,
    },
    ContentBlockStop {
        index: usize,
    },
    MessageDelta {
        stop_reason: Option<String>,
        usage: Option<Usage>,
    },
    MessageStop,
    Ping,
}

ContentBlockType

pub enum ContentBlockType {
    Text,
    ToolUse { id: String, name: String },
}

Usage

pub struct Usage {
    pub input_tokens: u32,
    pub output_tokens: u32,
}

Event Parsing

SSE events are parsed based on their type:

pub fn parse_stream_event(sse: &SseEvent) -> Result<Option<StreamEvent>, LlmError> {
    match sse.event.as_str() {
        "message_start" => parse_message_start(&sse.data),
        "content_block_start" => parse_content_block_start(&sse.data),
        "content_block_delta" => parse_content_block_delta(&sse.data),
        "content_block_stop" => parse_content_block_stop(&sse.data),
        "message_delta" => parse_message_delta(&sse.data),
        "message_stop" => Ok(Some(StreamEvent::MessageStop)),
        "ping" => Ok(Some(StreamEvent::Ping)),
        "error" => parse_error(&sse.data),
        _ => Ok(None), // Unknown event type
    }
}

Event Flow

A typical streaming response:

1. message_start     - Message metadata (id, model)
2. content_block_start - New content block starting (text or tool_use)
3. content_block_delta - Incremental content (repeated)
4. content_block_stop  - Content block complete
5. message_delta     - Stop reason and usage stats
6. message_stop      - Stream complete

Text Response Flow

MessageStart { id: "msg_001", model: "claude-3-sonnet" }
ContentBlockStart { index: 0, block_type: Text }
TextDelta { index: 0, text: "Hello" }
TextDelta { index: 0, text: " world" }
TextDelta { index: 0, text: "!" }
ContentBlockStop { index: 0 }
MessageDelta { stop_reason: Some("end_turn"), usage: Some(...) }
MessageStop

Tool Use Flow

MessageStart { id: "msg_002", model: "claude-3-sonnet" }
ContentBlockStart { index: 0, block_type: ToolUse { id: "call_001", name: "get_weather" } }
InputJsonDelta { index: 0, json: "{\"" }
InputJsonDelta { index: 0, json: "city" }
InputJsonDelta { index: 0, json: "\":\"" }
InputJsonDelta { index: 0, json: "NYC\"}" }
ContentBlockStop { index: 0 }
MessageDelta { stop_reason: Some("tool_use"), usage: Some(...) }
MessageStop

Processing Streams

Application code processes events as they arrive:

let mut stream = client.send_message_stream(&messages, &options).await?;
let mut full_text = String::new();
let mut tool_json = String::new();

while let Some(event) = stream.next().await {
    match event? {
        StreamEvent::TextDelta { text, .. } => {
            print!("{}", text);
            full_text.push_str(&text);
        }
        StreamEvent::InputJsonDelta { json, .. } => {
            tool_json.push_str(&json);
        }
        StreamEvent::ContentBlockStop { .. } => {
            if !tool_json.is_empty() {
                // Parse complete tool call
                let args: serde_json::Value = serde_json::from_str(&tool_json)?;
                // Execute tool...
                tool_json.clear();
            }
        }
        StreamEvent::MessageDelta { usage, stop_reason } => {
            if let Some(usage) = usage {
                println!("\nTokens used: {}", usage.output_tokens);
            }
        }
        StreamEvent::MessageStop => break,
        _ => {}
    }
}

Buffer Management

Chunked data requires careful buffering:

let mut buffer = String::new();

while let Some(chunk) = byte_stream.next().await {
    // Append new data
    buffer.push_str(&String::from_utf8_lossy(&chunk?));

    // Parse complete events, keep remainder
    let (events, remaining) = parse_sse_chunk(&buffer);
    buffer = remaining;

    for event in events {
        // Process event
    }
}

This handles:

  • Events split across chunks
  • Multiple events in a single chunk
  • Partial JSON data

Error Handling

Stream errors are yielded as results:

while let Some(result) = stream.next().await {
    match result {
        Ok(event) => { /* process */ }
        Err(e) => {
            tracing::error!("Stream error: {}", e);
            break;
        }
    }
}

Error Event

The API can send error events mid-stream:

"error" => {
    let parsed: serde_json::Value = serde_json::from_str(&sse.data)?;
    let error_type = parsed["error"]["type"].as_str().unwrap_or("unknown");
    let message = parsed["error"]["message"].as_str().unwrap_or("Unknown error");
    Err(LlmError::new(error_type, message))
}

Ping Events

Keep-alive pings maintain the connection:

StreamEvent::Ping => {
    // Connection is alive, no action needed
}

Next Steps