The streaming events system provides real-time feedback during agent execution:
Text streaming - Token-by-token output as the LLM generates
Tool events - Know when tools start, complete, or fail
Lifecycle events - Track agent steps and completion
Guardrail events - Monitor validation progress
Handoff events - Track agent-to-agent delegation
Quick Start
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
importorg.llm4s.agent.streaming._agent.runWithEvents(query,tools){caseTextDelta(text,_)=>print(text)// Real-time token outputcaseToolCallStarted(_,name,_,_)=>println(s"\nCalling $name...")caseToolCallCompleted(_,name,result,_,durationMs,_)=>println(s"$name completed in ${durationMs}ms")caseAgentCompleted(state,steps,totalMs,_)=>println(s"\nDone in $steps steps (${totalMs}ms)")case_=>()// Ignore other events}
Event Types
Text Events
Event
Description
Fields
TextDelta
Token-level streaming chunk
delta, timestamp
TextComplete
Full text generation finished
fullText, timestamp
1
2
3
4
5
6
7
8
caseTextDelta(delta,timestamp)=>// delta: The new text chunk// timestamp: When this chunk was receivedprint(delta)caseTextComplete(fullText,timestamp)=>// fullText: Complete generated textprintln(s"\n--- Complete: ${fullText.length} chars ---")
caseHandoffStarted(targetName,reason,preserveContext,_)=>println(s"Handing off to $targetName: $reason")println(s"Context preserved: $preserveContext")caseHandoffCompleted(targetName,success,_)=>println(s"Handoff to $targetName: ${if (success) "success" else "failed"}")
Usage Patterns
Basic Streaming UI
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
agent.runWithEvents(query,tools){event=>eventmatch{caseTextDelta(text,_)=>print(text)System.out.flush()caseAgentCompleted(_,steps,ms,_)=>println(s"\n\nā Completed in $steps steps (${ms}ms)")caseAgentFailed(error,step,_)=>println(s"\n\nā Failed at step $step: $error")case_=>()}}
importorg.llm4s.agent.streaming.StreamingAccumulatorvalaccumulator=newStreamingAccumulator()agent.runWithEvents(query,tools){event=>accumulator.record(event)// Also handle real-time displayeventmatch{caseTextDelta(text,_)=>print(text)case_=>()}}// Get metrics after completionvalmetrics=accumulator.getMetrics()println(s"Total tokens: ${metrics.tokenCount}")println(s"Time to first token: ${metrics.timeToFirstToken}ms")println(s"Tool calls: ${metrics.toolCallCount}")println(s"Total duration: ${metrics.totalDuration}ms")
importjava.util.concurrent.atomic.AtomicBooleanvalcancelled=newAtomicBoolean(false)// In another thread or signal handlerdefcancel():Unit=cancelled.set(true)agent.runWithEvents(query=query,tools=tools,cancellationCheck=()=>cancelled.get(),onEvent={event=>eventmatch{caseTextDelta(text,_)=>print(text)caseAgentFailed(error,_,_)iferror.contains("cancelled")=>println("\nš Cancelled")case_=>()}})
agent.runWithEvents(query,tools){event=>// Only handle text and completion eventseventmatch{casee:TextDelta=>handleText(e)casee:AgentCompleted=>handleComplete(e)case_=>()// Ignore all other events}}
// Good - fast handleragent.runWithEvents(query,tools){event=>eventmatch{caseTextDelta(text,_)=>buffer.append(text)// Fast operationcase_=>()}}// Bad - slow handler blocks streamingagent.runWithEvents(query,tools){event=>eventmatch{caseTextDelta(text,_)=>database.insert(text)// Slow I/O in handlercase_=>()}}
importjava.util.concurrent.atomic.AtomicReferencevaltextBuffer=newAtomicReference[StringBuilder](newStringBuilder)varlastRender=System.currentTimeMillis()agent.runWithEvents(query,tools){event=>eventmatch{caseTextDelta(text,_)=>textBuffer.get().append(text)// Batch UI updates every 50msvalnow=System.currentTimeMillis()if(now-lastRender>50){renderUI(textBuffer.get().toString)lastRender=now}caseAgentCompleted(_,_,_,_)=>// Final renderrenderUI(textBuffer.get().toString)case_=>()}}