Skip to main content
A reporting pipeline that uses an inner data workflow as a step. The inner workflow handles data fetching and cleaning, while the outer workflow generates and delivers the report.

Workflow

pipeline.py
from timbal import Agent, Workflow
from timbal.state import get_run_context
import json


def fetch_sales() -> list[dict]:
    """Fetch raw sales data."""
    return [
        {"product": "Widget A", "amount": 120.00, "region": "EU"},
        {"product": "Widget B", "amount": 85.50, "region": "US"},
        {"product": "Widget A", "amount": 200.00, "region": "US"},
        {"product": "Widget C", "amount": 45.00, "region": "EU"},
    ]


def aggregate(sales: list) -> dict:
    """Aggregate sales by region."""
    by_region = {}
    for sale in sales:
        region = sale["region"]
        by_region[region] = by_region.get(region, 0) + sale["amount"]
    return {
        "by_region": by_region,
        "total": sum(sale["amount"] for sale in sales),
    }


# Inner workflow: data preparation
data_pipeline = (
    Workflow(name="data_pipeline")
    .step(fetch_sales)
    .step(aggregate,
        sales=lambda: get_run_context().step_span("fetch_sales").output)
)


analyst = Agent(
    name="analyst",
    model="openai/gpt-4.1-mini",
    system_prompt="You are a data analyst. Given sales data, write a short summary with key insights."
)


def format_email(analysis: str, data: dict) -> str:
    """Format the final email."""
    return f"Subject: Weekly Sales Report\n\n{analysis}\n\nRaw data:\n{json.dumps(data, indent=2)}"


# Outer workflow: uses data_pipeline as a step
report_pipeline = (
    Workflow(name="report_pipeline")
    .step(data_pipeline)
    .step(analyst,
        prompt=lambda: json.dumps(get_run_context().step_span("data_pipeline").output))
    .step(format_email,
        analysis=lambda: get_run_context().step_span("analyst").output.collect_text(),
        data=lambda: get_run_context().step_span("data_pipeline").output)
)

How It Works

┌─  data_pipeline ─────────┐
│   fetch_sales → aggregate     │
└────────────────────┘

        ├─→ analyst
        │       │
        └────┴─→ format_email
  1. data_pipeline (inner workflow) — fetches and aggregates sales data as a single step
  2. analyst — LLM analyzes the aggregated data (waits for data_pipeline)
  3. format_email — combines the LLM analysis and raw data into a report (waits for both data_pipeline and analyst)
The inner workflow’s final output (aggregate’s result) becomes the step output accessible by the outer workflow.

Running

result = await report_pipeline().collect()
print(result.output)
The output will be similar to:
Subject: Weekly Sales Report

The total sales for this period amount to $450.50. The US region leads with
$285.50 (63%), while the EU region accounts for $165.00 (37%). Widget A is the
top performer with $320.00 across both regions.

Raw data:
{
  "by_region": {
    "EU": 165.0,
    "US": 285.5
  },
  "total": 450.5
}