The Event Stream
In the previous examples, we used .collect() to get the final result. When you call a Runnable, it doesn’t just return the answer - it returns a stream of events that tell you what’s happening step by step (an async generator). The .collect() method waits for all events and gives you just the final answer.
You can iterate through the async generator to process events in real-time:
async for event in add_tool(a=5, b=3):
print(event)
# Output:
# StartEvent(run_id="068c4458382e79bb80006dc019ac3039", ...)
# OutputEvent(run_id="068c4458382e79bb80006dc019ac3039", output=8, ...)
This enables you to:
- Monitor execution progress in real-time
- Handle streaming responses from LLMs
- Debug execution flow
- Build reactive user interfaces
Event Logging
In reality, you don’t need to manually print events. By default, events are logged to standard output by the framework. You can control logging behavior with these environment variables:
TIMBAL_LOG_EVENTS: Which events to log (default: "START,OUTPUT")
TIMBAL_LOG_FORMAT: Log format - "dev" for human-readable or "json" for structured (default)
TIMBAL_LOG_LEVEL: Standard log level (default: "INFO")
Event Types
Events are the communication mechanism that Runnables use to stream information throughout their execution lifecycle. Every Runnable execution produces a sequence of events that can be consumed in real-time or collected for later processing.
Events are designed to be lightweight. For comprehensive execution data, see Traces.
Every execution produces at least a Start event (when it begins) and an Output event (when it finishes). LLMs and streaming operations also produce Chunk or Delta events for intermediate results.
The following examples show what these events look like for an LLM interaction:
Start Event
Signals the beginning of an execution.
from timbal.types.events import StartEvent
start_event = StartEvent(
run_id="068c4458382e79bb80006dc019ac3039",
parent_run_id=None,
path="agent.llm",
call_id="068c4458383678be800031537a8df42e",
parent_call_id="068c4458382e708f8000cc0f9b19d970",
)
These fields are present in all events to identify their source - the framework handles this automatically. We’ll explore this in greater detail in advanced sections.
Chunk Event
Contains streaming or intermediate results during execution. The chunk property contains each piece of the response as it’s generated during streaming.
from timbal.types.events import ChunkEvent
chunk_event = ChunkEvent(
run_id="068c4458382e79bb80006dc019ac3039",
parent_run_id=None,
path="agent.llm",
call_id="068c4458383678be800031537a8df42e",
parent_call_id="068c4458382e708f8000cc0f9b19d970",
chunk="Hello"
)
Deprecation Notice: ChunkEvents are simple and untyped, and will be deprecated in a future release in favor of Delta Events. Delta Events provide structured, typed streaming with semantic information about different content types (text, tool calls, thinking) and better observability. We recommend migrating to Delta Events for new projects.
Delta Events
For advanced use cases requiring fine-grained control over streaming content, Timbal provides Delta Events. Unlike simple ChunkEvents, DeltaEvents provide typed, structured information about different types of streaming content.
Delta events are opt-in via the environment variable:export TIMBAL_DELTA_EVENTS=true
Delta Event Structure
Each DeltaEvent contains an item property with a typed delta item. All delta items inherit from DeltaItem and have an id and type field:
from timbal.types.events import DeltaEvent
from timbal.types.events.delta import Text, TextDelta, ToolUse
# Text block start (complete text content)
text_event = DeltaEvent(
run_id="068c4458382e79bb80006dc019ac3039",
path="agent.llm",
call_id="068c4458383678be800031537a8df42e",
item=Text(id="text_0", text="Hello")
)
# Text streaming delta
text_delta_event = DeltaEvent(
run_id="068c4458382e79bb80006dc019ac3039",
path="agent.llm",
call_id="068c4458383678be800031537a8df42e",
item=TextDelta(id="text_0", text_delta=" world")
)
# Tool call start
tool_event = DeltaEvent(
run_id="068c4458382e79bb80006dc019ac3039",
path="agent.llm",
call_id="068c4458383678be800031537a8df42e",
item=ToolUse(id="call_123", name="search", input="")
)
Delta Item Types
DeltaEvents use a pattern of paired types: a “start” type for the beginning of a content block, and a “delta” type for streaming incremental updates. All items have an id field to correlate deltas with their parent block.
Text - Start of a text content block:
Text(
id="text_0",
text="Hello" # Initial text content
)
TextDelta - Streaming text increments:
TextDelta(
id="text_0",
text_delta=" world" # Incremental text
)
ToolUse - Start of a tool call:
ToolUse(
id="call_abc123",
name="search_web",
input="", # Initially empty
is_server_tool_use=False
)
ToolUseDelta - Streaming tool input parameters:
ToolUseDelta(
id="call_abc123",
input_delta='{"query' # Partial JSON
)
LLMs stream tool input parameters incrementally. Multiple ToolUseDelta events will be emitted for a single tool call. You need to accumulate these deltas and parse the complete JSON afterwards.
Thinking - Start of LLM reasoning block:
Thinking(
id="thinking_0",
thinking="Let me analyze this problem..."
)
ThinkingDelta - Streaming thinking increments:
ThinkingDelta(
id="thinking_0",
thinking_delta="First, I'll consider..."
)
Custom - Arbitrary custom content:
Custom(
id="custom_0",
data={"type": "image_progress", "percent": 45}
)
Use Custom for streaming content that doesn’t fit standard types (e.g., multimodal content, provider-specific features, experimental data). This allows custom collectors and tools to emit arbitrary typed data while participating in the delta event system.
ContentBlockStop - Signals end of a content block:
ContentBlockStop(id="text_0")
This event signals that a content block (text, tool use, thinking) has finished streaming. Use it to finalize processing of accumulated deltas.
Using Delta Events
from timbal.types.events import DeltaEvent
from timbal.types.events.delta import (
Text, TextDelta, ToolUse, ToolUseDelta,
Thinking, ThinkingDelta, Custom, ContentBlockStop
)
async for event in agent(input="Hello"):
if isinstance(event, DeltaEvent):
item = event.item
# Type-safe handling of different content types
if isinstance(item, Text):
print(item.text, end="", flush=True)
elif isinstance(item, TextDelta):
print(item.text_delta, end="", flush=True)
elif isinstance(item, ToolUse):
print(f"\n[Calling {item.name}]")
elif isinstance(item, ToolUseDelta):
# Accumulate input_delta for later JSON parsing
pass
elif isinstance(item, Thinking):
print(f"\n[Thinking: {item.thinking}]")
elif isinstance(item, ThinkingDelta):
print(item.thinking_delta, end="", flush=True)
elif isinstance(item, Custom):
print(f"\n[Custom: {item.data}]")
elif isinstance(item, ContentBlockStop):
print(f"\n[Block {item.id} complete]")
Benefits of Delta Events
- Type Safety: Each delta item has specific fields and types
- Semantic Information: Know exactly what type of content is streaming
- Better Observability: Track tool calls, thinking, and text separately
- Block Lifecycle: Start and stop events for each content block
- UI Flexibility: Render different content types with appropriate components
- Structured Logging: Monitor different content types independently
Backward Compatibility
Delta Events are opt-in.When TIMBAL_DELTA_EVENTS is disabled (default):
- Text content is emitted as
ChunkEvent for backward compatibility
- Other delta types (tool calls, thinking) are filtered out
- Existing code continues to work unchanged
When enabled:
- All content is emitted as structured
DeltaEvent instances
- No
ChunkEvent instances for LLM text output
- Full observability into all streaming content types
Output Event
Contains the final result and signals completion. Contains either the final output result (if successful) or error information (if something went wrong).
from timbal.types.events import OutputEvent
output_event = OutputEvent(
run_id="068c4458382e79bb80006dc019ac3039",
parent_run_id=None,
path="agent.llm",
call_id="068c4458383678be800031537a8df42e",
parent_call_id="068c4458382e708f8000cc0f9b19d970",
output={
"role": "assistant",
"content": [{"type": "text", "text": "Hello! How can I assist you today?"}]
},
error=None
)
Handling Errors
When a Runnable encounters an error during execution, it won’t raise an exception. Instead, it will always return an OutputEvent with error information. This ensures the event stream continues and you can handle errors gracefully:
try:
result = await runnable(**kwargs).collect()
except Exception as e:
# This won't happen - errors are in the OutputEvent
pass
# Result will always be an instance of OutputEvent
if result.error:
print(f"Error: {result.error['message']}")
Errors will always have this structure:
error = {
"type": "ValueError",
"message": "Invalid input provided",
"traceback": "Traceback (most recent call last):\\n..."
}