LLM4S Streaming Implementation Plan
LLM4S Streaming Implementation Plan
Executive Overview
This document outlines the implementation plan for adding streaming support to the LLM4S library. Currently, the library has placeholder implementations for streaming in the OpenAI and Anthropic clients. This plan provides a comprehensive approach to implement real streaming functionality that is simple for users, robust, and maintainable.
Goals
- Implement real-time streaming for OpenAI and Anthropic providers
- Provide a simple, callback-based API for users
- Maintain cross-compatibility with Scala 2.13 and 3
- Ensure robust error handling and fallback mechanisms
- Create comprehensive test coverage
Non-Goals (Future Enhancements)
- WebSocket support
- Reactive Streams API
- Server-side streaming proxy
- Advanced token tracking during streaming
Technical Architecture
Core Components
1. SSE (Server-Sent Events) Parser
Handles parsing of the SSE format used by both OpenAI and Anthropic streaming endpoints.
Key Features:
- Parse
data:
andevent:
fields - Handle multi-line data
- Buffer incomplete messages
- Support both providers’ formats
2. Streaming Response Handler
Manages the streaming lifecycle and chunk accumulation.
Key Features:
- Accumulate content from chunks
- Detect completion signals
- Handle errors gracefully
- Manage resource cleanup
3. Provider Implementations
Each provider will have specific streaming logic while sharing common infrastructure.
OpenAI:
- Support delta message format
- Handle tool calls in streaming context
- Use Azure SDK streaming if available, fallback to HTTP
Anthropic:
- Support message event format
- Handle content blocks
- Use HTTP client with SSE parsing
API Design
Simple Callback API
client.streamComplete(
conversation,
options,
onChunk = (chunk: StreamedChunk) => {
print(chunk.content.getOrElse(""))
}
)
Enhanced Streaming Options
case class StreamingOptions(
onStart: () => Unit = () => (),
onChunk: StreamedChunk => Unit,
onError: LLMError => Unit = _ => (),
onComplete: Completion => Unit = _ => ()
)
Implementation Checklist
Phase 1: Core Infrastructure ✅
SSE Parser (src/main/scala/org/llm4s/llmconnect/streaming/SSEParser.scala
)
- Create SSEParser object
- Implement parseEvent method for single SSE event
- Implement parseStream method for continuous parsing
- Handle multi-line data fields
- Support comment lines (starting with
:
) - Add error handling for malformed events
- Create unit tests for various SSE formats
Streaming Response Handler (src/main/scala/org/llm4s/llmconnect/streaming/StreamingResponseHandler.scala
)
- Create StreamingResponseHandler trait
- Implement chunk accumulation logic
- Add completion detection
- Implement error handling and recovery
- Add resource cleanup methods
- Create unit tests for handler logic
Streaming Accumulator (src/main/scala/org/llm4s/llmconnect/streaming/StreamingAccumulator.scala
)
- Create StreamingAccumulator class
- Implement content accumulation
- Handle tool call accumulation
- Track token usage if available
- Provide methods to get final completion
- Add unit tests for accumulation scenarios
Streaming Options (src/main/scala/org/llm4s/llmconnect/streaming/StreamingOptions.scala
)
- Define StreamingOptions case class
- Add callback definitions
- Provide default implementations
- Add builder pattern for convenience
- Document callback semantics
Phase 2: OpenAI Implementation ✅
Update OpenAIClient (src/main/scala/org/llm4s/llmconnect/provider/OpenAIClient.scala
)
- Check if Azure SDK supports streaming (YES - getChatCompletionsStream)
- Implement SDK-based streaming (using native Azure SDK)
- Parse OpenAI SSE format (SDK handles this)
- Handle delta messages
- Accumulate content from deltas
- Handle tool calls in streaming
- Add error handling
- Compile and test successfully
- Implement connection retry logic
- Add integration tests
OpenAI Streaming Parser
- Parse
choices[].delta
format (SDK handles) - Handle
role
field in first chunk - Accumulate
content
deltas - Handle
function_call
deltas - Detect
finish_reason
- Parse usage statistics if present
Phase 3: Anthropic Implementation ✅
Update AnthropicClient (src/main/scala/org/llm4s/llmconnect/provider/AnthropicClient.scala
)
- Implement streaming using SDK (createStreaming method)
- Parse Anthropic event format (SDK handles)
- Handle message events
- Process content blocks
- Handle tool use blocks
- Accumulate text content
- Add error handling
- Fix compilation issues with event type checking
- Handle Java Optional types correctly
- Implement connection retry logic
- Add integration tests
Anthropic Streaming Parser
- Parse
message_start
event (SDK handles) - Handle
content_block_start
events - Process
content_block_delta
events - Handle
content_block_stop
events (implicit) - Process
message_delta
events - Detect
message_stop
event - Extract usage information
Phase 4: OpenRouter Implementation ✅
Update OpenRouterClient (src/main/scala/org/llm4s/llmconnect/provider/OpenRouterClient.scala
)
- Enhance existing HTTP client usage
- Add streaming endpoint support
- Reuse OpenAI SSE parser
- Handle provider-specific quirks
- Add integration tests
Phase 5: Testing (Partial) ⚠️
Unit Tests
SSEParserTest.scala
- Test basic SSE parsing
- Test multi-line data
- Test comment handling
- Test error cases
StreamingAccumulatorTest.scala
- Test content accumulation
- Test tool call handling
- Test completion generation
StreamingResponseHandlerTest.scala
- Test lifecycle management
- Test error scenarios
- Test resource cleanup
Integration Tests
OpenAIStreamingTest.scala
- Mock streaming responses
- Test complete streaming flow
- Test error scenarios
- Test interruption handling
AnthropicStreamingTest.scala
- Mock streaming responses
- Test complete streaming flow
- Test error scenarios
- Test interruption handling
OpenRouterStreamingTest.scala
- Mock streaming responses
- Test various model behaviors
Example Applications
BasicStreamingExample.scala
- Simple streaming usage
- Console output example
StreamingWithProgressExample.scala
- Progress bar implementation
- Token counting example
StreamingErrorHandlingExample.scala
- Error recovery patterns
- Retry logic demonstration
StreamingAccumulatorExample.scala
- Using accumulator helper
- Building complete response
Phase 6: Documentation
API Documentation
- Document streamComplete method
- Document StreamedChunk structure
- Document StreamingOptions
- Add ScalaDoc comments
- Include usage examples in docs
README Updates
- Add streaming section to README
- Include quick start example
- Document provider-specific notes
- Add troubleshooting section
Migration Guide
- Document changes from placeholder
- Provide upgrade instructions
- Note any breaking changes
- Include fallback strategies
Testing Strategy
Unit Testing
Focus on testing individual components in isolation:
- SSE parsing with various formats
- Chunk accumulation logic
- Error handling scenarios
- Resource management
Integration Testing
Test the complete streaming flow with mocked responses:
- Full streaming lifecycle
- Error recovery
- Network interruption handling
- Rate limiting scenarios
Manual Testing
- Test with real API keys (behind feature flag)
- Verify with different models
- Test long-running streams
- Monitor memory usage
- Test concurrent streams
Performance Testing
- Measure streaming latency
- Monitor memory consumption
- Test with large responses
- Verify connection pooling
Implementation Timeline
Week 1
- Core infrastructure implementation
- SSE parser and tests
- Response handler framework
Week 2
- OpenAI implementation
- Anthropic implementation
- Basic integration tests
Week 3
- OpenRouter enhancement
- Comprehensive testing
- Documentation
- Examples
Code Examples
Basic Usage
import org.llm4s.llmconnect.LLM
import org.llm4s.llmconnect.model._
val client = LLM.client()
val conversation = Conversation(Seq(
UserMessage("Write a story about a robot")
))
client.streamComplete(
conversation,
options = CompletionOptions(),
onChunk = chunk => {
chunk.content.foreach(print)
}
)
Advanced Usage with Error Handling
import org.llm4s.llmconnect.streaming.StreamingOptions
val streamingOpts = StreamingOptions(
onStart = () => println("Starting stream..."),
onChunk = chunk => print(chunk.content.getOrElse("")),
onError = error => println(s"Error: ${error.message}"),
onComplete = completion => println(s"\nDone! Tokens: ${completion.usage}")
)
client.streamCompleteWithOptions(conversation, streamingOpts)
Using Accumulator
import org.llm4s.llmconnect.streaming.StreamingAccumulator
val accumulator = StreamingAccumulator.create()
client.streamComplete(
conversation,
onChunk = chunk => {
accumulator.addChunk(chunk)
// Update UI with partial content
updateUI(accumulator.getCurrentContent())
}
) match {
case Right(completion) =>
println(s"Final: ${completion.message.content}")
case Left(error) =>
println(s"Error: ${error.message}")
}
Risk Mitigation
Technical Risks
- SDK Limitations: Azure SDK may not support streaming
- Mitigation: Implement HTTP fallback
- API Changes: Provider APIs may change
- Mitigation: Abstract provider-specific logic
- Network Issues: Connection drops during streaming
- Mitigation: Implement reconnection logic
Performance Risks
- Memory Usage: Large streams consuming memory
- Mitigation: Process chunks immediately, limit buffering
- Thread Blocking: Blocking I/O affecting performance
- Mitigation: Use non-blocking HTTP client
Success Criteria
- Streaming works for OpenAI models (using native SDK)
- Streaming works for Anthropic models (using native SDK)
- Streaming works for OpenRouter (using HTTP SSE)
- All tests pass
- Documentation is complete
- Examples run successfully
- No performance regression
- Backwards compatibility maintained
Future Enhancements
- Reactive Streams API: For advanced async processing
- WebSocket Support: For bidirectional communication
- Streaming Proxy Server: For browser-based clients
- Token Tracking: Real-time token counting
- Partial Caching: Cache incomplete responses
- Stream Transformation: Modify streams in-flight
- Multiplexing: Share streams across multiple consumers
Notes
- All implementations must maintain cross-compatibility with Scala 2.13 and 3
- Error handling should always provide useful error messages
- Resource cleanup must be guaranteed even on failures
- Performance should be optimized for real-time output
- The API should remain simple for basic use cases
Implementation Status
Last Updated: August 17, 2025
Completed ✅
- Core streaming infrastructure (SSE parser, accumulator, response handlers)
- Native SDK streaming for OpenAI/Azure using
getChatCompletionsStream
- Native SDK streaming for Anthropic using
createStreaming
- HTTP-based SSE streaming for OpenRouter
- Basic unit tests for accumulator and SSE parser
- Example applications demonstrating streaming usage
Key Decisions Made
- Used native SDK streaming methods instead of custom SSE implementation for OpenAI and Anthropic
- This leverages provider-specific optimizations and reduces maintenance burden
- Kept SSE parser for OpenRouter which uses HTTP-based streaming
- Created unified
StreamedChunk
model for consistency across providers
Known Issues
- Anthropic SDK event types don’t follow expected inheritance hierarchy (using
isInstanceOf
checks instead of pattern matching) - Some SDK methods return Java
Optional
types requiring special handling - Compilation warnings about unreachable cases due to SDK type structure
Next Steps
- Add comprehensive integration tests
- Implement connection retry logic
- Complete documentation updates
- Add more example applications
- Performance benchmarking
This document tracks the implementation of streaming support in LLM4S. The feature is now functional but requires additional testing and documentation.