from timbal import Agent, Workflow
from timbal.state import get_run_context
import json
def fetch_sales() -> list[dict]:
"""Fetch raw sales data."""
return [
{"product": "Widget A", "amount": 120.00, "region": "EU"},
{"product": "Widget B", "amount": 85.50, "region": "US"},
{"product": "Widget A", "amount": 200.00, "region": "US"},
{"product": "Widget C", "amount": 45.00, "region": "EU"},
]
def aggregate(sales: list) -> dict:
"""Aggregate sales by region."""
by_region = {}
for sale in sales:
region = sale["region"]
by_region[region] = by_region.get(region, 0) + sale["amount"]
return {
"by_region": by_region,
"total": sum(sale["amount"] for sale in sales),
}
# Inner workflow: data preparation
data_pipeline = (
Workflow(name="data_pipeline")
.step(fetch_sales)
.step(aggregate,
sales=lambda: get_run_context().step_span("fetch_sales").output)
)
analyst = Agent(
name="analyst",
model="openai/gpt-4.1-mini",
system_prompt="You are a data analyst. Given sales data, write a short summary with key insights."
)
def format_email(analysis: str, data: dict) -> str:
"""Format the final email."""
return f"Subject: Weekly Sales Report\n\n{analysis}\n\nRaw data:\n{json.dumps(data, indent=2)}"
# Outer workflow: uses data_pipeline as a step
report_pipeline = (
Workflow(name="report_pipeline")
.step(data_pipeline)
.step(analyst,
prompt=lambda: json.dumps(get_run_context().step_span("data_pipeline").output))
.step(format_email,
analysis=lambda: get_run_context().step_span("analyst").output.collect_text(),
data=lambda: get_run_context().step_span("data_pipeline").output)
)