LLM4S Streaming Implementation Plan

Executive Overview

This document outlines the implementation plan for adding streaming support to the LLM4S library. Currently, the library has placeholder implementations for streaming in the OpenAI and Anthropic clients. This plan provides a comprehensive approach to implement real streaming functionality that is simple for users, robust, and maintainable.

Goals

  • Implement real-time streaming for OpenAI and Anthropic providers
  • Provide a simple, callback-based API for users
  • Maintain cross-compatibility with Scala 2.13 and 3
  • Ensure robust error handling and fallback mechanisms
  • Create comprehensive test coverage

Non-Goals (Future Enhancements)

  • WebSocket support
  • Reactive Streams API
  • Server-side streaming proxy
  • Advanced token tracking during streaming

Technical Architecture

Core Components

1. SSE (Server-Sent Events) Parser

Handles parsing of the SSE format used by both OpenAI and Anthropic streaming endpoints.

Key Features:

  • Parse data: and event: fields
  • Handle multi-line data
  • Buffer incomplete messages
  • Support both providers’ formats

2. Streaming Response Handler

Manages the streaming lifecycle and chunk accumulation.

Key Features:

  • Accumulate content from chunks
  • Detect completion signals
  • Handle errors gracefully
  • Manage resource cleanup

3. Provider Implementations

Each provider will have specific streaming logic while sharing common infrastructure.

OpenAI:

  • Support delta message format
  • Handle tool calls in streaming context
  • Use Azure SDK streaming if available, fallback to HTTP

Anthropic:

  • Support message event format
  • Handle content blocks
  • Use HTTP client with SSE parsing

API Design

Simple Callback API

client.streamComplete(
  conversation,
  options,
  onChunk = (chunk: StreamedChunk) => {
    print(chunk.content.getOrElse(""))
  }
)

Enhanced Streaming Options

case class StreamingOptions(
  onStart: () => Unit = () => (),
  onChunk: StreamedChunk => Unit,
  onError: LLMError => Unit = _ => (),
  onComplete: Completion => Unit = _ => ()
)

Implementation Checklist

Phase 1: Core Infrastructure ✅

SSE Parser (src/main/scala/org/llm4s/llmconnect/streaming/SSEParser.scala)

  • Create SSEParser object
  • Implement parseEvent method for single SSE event
  • Implement parseStream method for continuous parsing
  • Handle multi-line data fields
  • Support comment lines (starting with :)
  • Add error handling for malformed events
  • Create unit tests for various SSE formats

Streaming Response Handler (src/main/scala/org/llm4s/llmconnect/streaming/StreamingResponseHandler.scala)

  • Create StreamingResponseHandler trait
  • Implement chunk accumulation logic
  • Add completion detection
  • Implement error handling and recovery
  • Add resource cleanup methods
  • Create unit tests for handler logic

Streaming Accumulator (src/main/scala/org/llm4s/llmconnect/streaming/StreamingAccumulator.scala)

  • Create StreamingAccumulator class
  • Implement content accumulation
  • Handle tool call accumulation
  • Track token usage if available
  • Provide methods to get final completion
  • Add unit tests for accumulation scenarios

Streaming Options (src/main/scala/org/llm4s/llmconnect/streaming/StreamingOptions.scala)

  • Define StreamingOptions case class
  • Add callback definitions
  • Provide default implementations
  • Add builder pattern for convenience
  • Document callback semantics

Phase 2: OpenAI Implementation ✅

Update OpenAIClient (src/main/scala/org/llm4s/llmconnect/provider/OpenAIClient.scala)

  • Check if Azure SDK supports streaming (YES - getChatCompletionsStream)
  • Implement SDK-based streaming (using native Azure SDK)
  • Parse OpenAI SSE format (SDK handles this)
  • Handle delta messages
  • Accumulate content from deltas
  • Handle tool calls in streaming
  • Add error handling
  • Compile and test successfully
  • Implement connection retry logic
  • Add integration tests

OpenAI Streaming Parser

  • Parse choices[].delta format (SDK handles)
  • Handle role field in first chunk
  • Accumulate content deltas
  • Handle function_call deltas
  • Detect finish_reason
  • Parse usage statistics if present

Phase 3: Anthropic Implementation ✅

Update AnthropicClient (src/main/scala/org/llm4s/llmconnect/provider/AnthropicClient.scala)

  • Implement streaming using SDK (createStreaming method)
  • Parse Anthropic event format (SDK handles)
  • Handle message events
  • Process content blocks
  • Handle tool use blocks
  • Accumulate text content
  • Add error handling
  • Fix compilation issues with event type checking
  • Handle Java Optional types correctly
  • Implement connection retry logic
  • Add integration tests

Anthropic Streaming Parser

  • Parse message_start event (SDK handles)
  • Handle content_block_start events
  • Process content_block_delta events
  • Handle content_block_stop events (implicit)
  • Process message_delta events
  • Detect message_stop event
  • Extract usage information

Phase 4: OpenRouter Implementation ✅

Update OpenRouterClient (src/main/scala/org/llm4s/llmconnect/provider/OpenRouterClient.scala)

  • Enhance existing HTTP client usage
  • Add streaming endpoint support
  • Reuse OpenAI SSE parser
  • Handle provider-specific quirks
  • Add integration tests

Phase 5: Testing (Partial) ⚠️

Unit Tests

  • SSEParserTest.scala
    • Test basic SSE parsing
    • Test multi-line data
    • Test comment handling
    • Test error cases
  • StreamingAccumulatorTest.scala
    • Test content accumulation
    • Test tool call handling
    • Test completion generation
  • StreamingResponseHandlerTest.scala
    • Test lifecycle management
    • Test error scenarios
    • Test resource cleanup

Integration Tests

  • OpenAIStreamingTest.scala
    • Mock streaming responses
    • Test complete streaming flow
    • Test error scenarios
    • Test interruption handling
  • AnthropicStreamingTest.scala
    • Mock streaming responses
    • Test complete streaming flow
    • Test error scenarios
    • Test interruption handling
  • OpenRouterStreamingTest.scala
    • Mock streaming responses
    • Test various model behaviors

Example Applications

  • BasicStreamingExample.scala
    • Simple streaming usage
    • Console output example
  • StreamingWithProgressExample.scala
    • Progress bar implementation
    • Token counting example
  • StreamingErrorHandlingExample.scala
    • Error recovery patterns
    • Retry logic demonstration
  • StreamingAccumulatorExample.scala
    • Using accumulator helper
    • Building complete response

Phase 6: Documentation

API Documentation

  • Document streamComplete method
  • Document StreamedChunk structure
  • Document StreamingOptions
  • Add ScalaDoc comments
  • Include usage examples in docs

README Updates

  • Add streaming section to README
  • Include quick start example
  • Document provider-specific notes
  • Add troubleshooting section

Migration Guide

  • Document changes from placeholder
  • Provide upgrade instructions
  • Note any breaking changes
  • Include fallback strategies

Testing Strategy

Unit Testing

Focus on testing individual components in isolation:

  • SSE parsing with various formats
  • Chunk accumulation logic
  • Error handling scenarios
  • Resource management

Integration Testing

Test the complete streaming flow with mocked responses:

  • Full streaming lifecycle
  • Error recovery
  • Network interruption handling
  • Rate limiting scenarios

Manual Testing

  • Test with real API keys (behind feature flag)
  • Verify with different models
  • Test long-running streams
  • Monitor memory usage
  • Test concurrent streams

Performance Testing

  • Measure streaming latency
  • Monitor memory consumption
  • Test with large responses
  • Verify connection pooling

Implementation Timeline

Week 1

  • Core infrastructure implementation
  • SSE parser and tests
  • Response handler framework

Week 2

  • OpenAI implementation
  • Anthropic implementation
  • Basic integration tests

Week 3

  • OpenRouter enhancement
  • Comprehensive testing
  • Documentation
  • Examples

Code Examples

Basic Usage

import org.llm4s.llmconnect.LLM
import org.llm4s.llmconnect.model._

val client = LLM.client()
val conversation = Conversation(Seq(
  UserMessage("Write a story about a robot")
))

client.streamComplete(
  conversation,
  options = CompletionOptions(),
  onChunk = chunk => {
    chunk.content.foreach(print)
  }
)

Advanced Usage with Error Handling

import org.llm4s.llmconnect.streaming.StreamingOptions

val streamingOpts = StreamingOptions(
  onStart = () => println("Starting stream..."),
  onChunk = chunk => print(chunk.content.getOrElse("")),
  onError = error => println(s"Error: ${error.message}"),
  onComplete = completion => println(s"\nDone! Tokens: ${completion.usage}")
)

client.streamCompleteWithOptions(conversation, streamingOpts)

Using Accumulator

import org.llm4s.llmconnect.streaming.StreamingAccumulator

val accumulator = StreamingAccumulator.create()

client.streamComplete(
  conversation,
  onChunk = chunk => {
    accumulator.addChunk(chunk)
    // Update UI with partial content
    updateUI(accumulator.getCurrentContent())
  }
) match {
  case Right(completion) => 
    println(s"Final: ${completion.message.content}")
  case Left(error) =>
    println(s"Error: ${error.message}")
}

Risk Mitigation

Technical Risks

  1. SDK Limitations: Azure SDK may not support streaming
    • Mitigation: Implement HTTP fallback
  2. API Changes: Provider APIs may change
    • Mitigation: Abstract provider-specific logic
  3. Network Issues: Connection drops during streaming
    • Mitigation: Implement reconnection logic

Performance Risks

  1. Memory Usage: Large streams consuming memory
    • Mitigation: Process chunks immediately, limit buffering
  2. Thread Blocking: Blocking I/O affecting performance
    • Mitigation: Use non-blocking HTTP client

Success Criteria

  • Streaming works for OpenAI models (using native SDK)
  • Streaming works for Anthropic models (using native SDK)
  • Streaming works for OpenRouter (using HTTP SSE)
  • All tests pass
  • Documentation is complete
  • Examples run successfully
  • No performance regression
  • Backwards compatibility maintained

Future Enhancements

  1. Reactive Streams API: For advanced async processing
  2. WebSocket Support: For bidirectional communication
  3. Streaming Proxy Server: For browser-based clients
  4. Token Tracking: Real-time token counting
  5. Partial Caching: Cache incomplete responses
  6. Stream Transformation: Modify streams in-flight
  7. Multiplexing: Share streams across multiple consumers

Notes

  • All implementations must maintain cross-compatibility with Scala 2.13 and 3
  • Error handling should always provide useful error messages
  • Resource cleanup must be guaranteed even on failures
  • Performance should be optimized for real-time output
  • The API should remain simple for basic use cases

Implementation Status

Last Updated: August 17, 2025

Completed ✅

  • Core streaming infrastructure (SSE parser, accumulator, response handlers)
  • Native SDK streaming for OpenAI/Azure using getChatCompletionsStream
  • Native SDK streaming for Anthropic using createStreaming
  • HTTP-based SSE streaming for OpenRouter
  • Basic unit tests for accumulator and SSE parser
  • Example applications demonstrating streaming usage

Key Decisions Made

  • Used native SDK streaming methods instead of custom SSE implementation for OpenAI and Anthropic
  • This leverages provider-specific optimizations and reduces maintenance burden
  • Kept SSE parser for OpenRouter which uses HTTP-based streaming
  • Created unified StreamedChunk model for consistency across providers

Known Issues

  • Anthropic SDK event types don’t follow expected inheritance hierarchy (using isInstanceOf checks instead of pattern matching)
  • Some SDK methods return Java Optional types requiring special handling
  • Compilation warnings about unreachable cases due to SDK type structure

Next Steps

  1. Add comprehensive integration tests
  2. Implement connection retry logic
  3. Complete documentation updates
  4. Add more example applications
  5. Performance benchmarking

This document tracks the implementation of streaming support in LLM4S. The feature is now functional but requires additional testing and documentation.