Event Sink

The EventSink trait defines how your application receives events from the agent. Every streaming text chunk, tool execution, permission request, and completion signal flows through your sink implementation. By implementing this trait, you control how agent output reaches your users—whether through WebSocket messages, HTTP responses, log files, or any other delivery mechanism.

The sink abstraction decouples the agent’s event generation from event delivery. The agent produces events without knowing how they’ll be consumed, and your sink handles delivery without knowing how events were produced. This separation enables flexible integrations across different transport protocols and application architectures.


The EventSink Trait

The trait defines two methods for sending events, plus a cloning method for trait object support. Most implementations only need to implement the synchronous send() method; the async version has a default implementation.

pub trait EventSink: Send + Sync {
    /// Send an event without waiting for delivery confirmation
    fn send(&self, event: UiMessage) -> Result<(), SendError>;

    /// Send an event asynchronously with backpressure support
    fn send_async(&self, event: UiMessage)
        -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>> {
        Box::pin(async move { self.send(event) })
    }

    /// Clone this sink as a trait object
    fn clone_box(&self) -> Box<dyn EventSink>;
}

The send() method should be non-blocking. If your transport can’t accept events immediately, buffer them internally rather than blocking the agent. The send_async() method supports backpressure for sinks that need to await delivery, such as bounded channels.


Built-in Implementations

Agent Air provides two built-in sink implementations for common scenarios. These work well for many use cases and serve as reference implementations for custom sinks.

ChannelEventSink

Wraps an async channel sender, delivering events through Tokio’s mpsc channel. This is the default sink used by the TUI and works well when you want to process events in a separate task.

use agent_air::sink::ChannelEventSink;
use tokio::sync::mpsc;

let (tx, mut rx) = mpsc::channel(500);
let sink = ChannelEventSink::new(tx);

// Process events in another task
tokio::spawn(async move {
    while let Some(event) = rx.recv().await {
        handle_event(event);
    }
});

The channel provides natural backpressure—when the receiver falls behind, the sender slows down. Configure the buffer size based on your processing speed and memory constraints.

SimpleEventSink

Wraps a callback function, invoking it synchronously for each event. This is convenient for simple integrations where you want to handle events inline without channel overhead.

use agent_air::sink::SimpleEventSink;

let sink = SimpleEventSink::new(|event| {
    match event {
        UiMessage::TextChunk { text, .. } => print!("{}", text),
        UiMessage::Complete { .. } => println!("\n---Done---"),
        _ => {}
    }
});

Keep the callback fast since it runs synchronously in the event path. For slow operations like network I/O, spawn a task or use a channel-based sink instead.


Implementing Custom Sinks

Create custom sinks by implementing the EventSink trait. Your implementation decides how to serialize, buffer, and deliver events based on your transport requirements.

WebSocket Sink Example

A sink that serializes events to JSON and sends them over a WebSocket connection:

use agent_air::{EventSink, UiMessage, SendError};
use tokio::sync::mpsc;
use std::sync::Arc;

pub struct WebSocketSink {
    tx: mpsc::Sender<String>,
}

impl WebSocketSink {
    pub fn new(tx: mpsc::Sender<String>) -> Self {
        Self { tx }
    }
}

impl EventSink for WebSocketSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        let json = serde_json::to_string(&event)
            .map_err(|e| SendError::SerializationError(e.to_string()))?;

        self.tx.try_send(json)
            .map_err(|_| SendError::ChannelClosed)?;

        Ok(())
    }

    fn clone_box(&self) -> Box<dyn EventSink> {
        Box::new(Self { tx: self.tx.clone() })
    }
}

The sink sends JSON strings to a channel; a separate task reads from that channel and writes to the actual WebSocket. This separation keeps the sink non-blocking.

HTTP SSE Sink Example

A sink for Server-Sent Events that formats events according to the SSE protocol:

use agent_air::{EventSink, UiMessage, SendError};
use tokio::sync::broadcast;

pub struct SseSink {
    tx: broadcast::Sender<String>,
}

impl SseSink {
    pub fn new(tx: broadcast::Sender<String>) -> Self {
        Self { tx }
    }
}

impl EventSink for SseSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        let event_type = match &event {
            UiMessage::TextChunk { .. } => "text",
            UiMessage::ToolExecuting { .. } => "tool_start",
            UiMessage::ToolCompleted { .. } => "tool_end",
            UiMessage::Complete { .. } => "complete",
            UiMessage::Error { .. } => "error",
            _ => "event",
        };

        let data = serde_json::to_string(&event)
            .map_err(|e| SendError::SerializationError(e.to_string()))?;

        let sse_message = format!("event: {}\ndata: {}\n\n", event_type, data);

        let _ = self.tx.send(sse_message);  // Ignore if no receivers

        Ok(())
    }

    fn clone_box(&self) -> Box<dyn EventSink> {
        Box::new(Self { tx: self.tx.clone() })
    }
}

Broadcast channels allow multiple SSE clients to receive the same events. Each client subscribes to the channel and receives a copy of each message.


Handling Backpressure

When your transport can’t keep up with event generation, you need a backpressure strategy. The sink interface supports several approaches depending on your requirements.

Buffering

Buffer events internally and drain them when the transport is ready. This works well for bursty traffic but requires memory proportional to the burst size.

impl EventSink for BufferingSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        let mut buffer = self.buffer.lock().unwrap();
        buffer.push(event);

        // Wake the drain task if needed
        self.notify.notify_one();

        Ok(())
    }
}

Dropping

Drop events when the buffer is full. Acceptable for events like TokenUpdate where the latest value supersedes previous ones, but not for TextChunk which would cause garbled output.

impl EventSink for DroppingSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        match self.tx.try_send(event) {
            Ok(()) => Ok(()),
            Err(TrySendError::Full(_)) => {
                // Log and continue, accepting data loss
                log::warn!("Event dropped due to backpressure");
                Ok(())
            }
            Err(TrySendError::Disconnected(_)) => Err(SendError::ChannelClosed),
        }
    }
}

Async Waiting

Use send_async() to wait until the transport can accept the event. This provides true backpressure but may slow down the agent if the transport is persistently slow.

impl EventSink for BackpressureSink {
    fn send_async(&self, event: UiMessage)
        -> Pin<Box<dyn Future<Output = Result<(), SendError>> + Send + '_>>
    {
        Box::pin(async move {
            self.tx.send(event).await
                .map_err(|_| SendError::ChannelClosed)
        })
    }
}

Event Filtering

Not all events are relevant to every consumer. Filter events in your sink to reduce bandwidth and processing overhead.

impl EventSink for FilteringSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        // Only forward user-visible events
        let should_forward = matches!(
            event,
            UiMessage::TextChunk { .. } |
            UiMessage::ToolExecuting { .. } |
            UiMessage::ToolCompleted { .. } |
            UiMessage::Complete { .. } |
            UiMessage::Error { .. }
        );

        if should_forward {
            self.inner.send(event)
        } else {
            Ok(())  // Silently drop internal events
        }
    }
}

Common filtering strategies:

  • Drop TokenUpdate if you don’t display usage statistics
  • Drop ToolExecuting if you only care about results
  • Aggregate TextChunk into larger batches for high-latency transports

Error Handling

Sinks return SendError to indicate delivery failures. Handle these errors appropriately based on their cause and your application’s requirements.

pub enum SendError {
    /// The receiving channel was closed
    ChannelClosed,
    /// Failed to serialize the event
    SerializationError(String),
    /// Transport-specific error
    TransportError(String),
}

For persistent errors like a closed channel, the agent will typically stop processing. For transient errors, consider logging and continuing rather than failing the entire session.

impl EventSink for ResilientSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        match self.inner.send(event.clone()) {
            Ok(()) => Ok(()),
            Err(SendError::TransportError(e)) => {
                log::warn!("Transport error (retrying): {}", e);
                // Retry logic here
                self.inner.send(event)
            }
            Err(e) => Err(e),
        }
    }
}

Thread Safety

The EventSink trait requires Send + Sync, meaning your sink must be safe to use from multiple threads. The agent may send events from different tasks concurrently, so use appropriate synchronization.

For sinks wrapping channels, the channel sender handles synchronization. For sinks with internal state, use Mutex, RwLock, or atomic types.

pub struct StatefulSink {
    // Arc<Mutex<...>> for shared mutable state
    state: Arc<Mutex<SinkState>>,
    // Channel senders are already thread-safe
    tx: mpsc::Sender<UiMessage>,
}

impl EventSink for StatefulSink {
    fn send(&self, event: UiMessage) -> Result<(), SendError> {
        let mut state = self.state.lock().unwrap();
        state.event_count += 1;
        drop(state);  // Release lock before potentially blocking send

        self.tx.try_send(event)
            .map_err(|_| SendError::ChannelClosed)
    }
}

Testing Sinks

Test your sink implementations with mock events to verify correct serialization and delivery. The UiMessage enum can be constructed directly for testing.

#[tokio::test]
async fn test_websocket_sink() {
    let (tx, mut rx) = mpsc::channel(10);
    let sink = WebSocketSink::new(tx);

    // Send a test event
    let event = UiMessage::TextChunk {
        text: "Hello".to_string(),
        session_id: 1,
        turn_id: TurnId::new_assistant_turn(1),
    };
    sink.send(event).unwrap();

    // Verify serialization
    let json = rx.recv().await.unwrap();
    assert!(json.contains("Hello"));
    assert!(json.contains("TextChunk"));
}

Test edge cases like full buffers, closed channels, and malformed events to ensure your sink handles errors gracefully.