Skip to main content
Multiple independent data sources are fetched concurrently, then a final step merges everything into a single report.

Workflow

pipeline.py
from timbal import Workflow
from timbal.state import get_run_context


def fetch_users() -> list[dict]:
    """Fetch user data."""
    return [
        {"id": 1, "name": "Alice", "role": "admin"},
        {"id": 2, "name": "Bob", "role": "editor"},
    ]


def fetch_orders() -> list[dict]:
    """Fetch order data."""
    return [
        {"id": 101, "user_id": 1, "total": 59.99},
        {"id": 102, "user_id": 2, "total": 29.99},
        {"id": 103, "user_id": 1, "total": 14.50},
    ]


def fetch_inventory() -> list[dict]:
    """Fetch inventory data."""
    return [
        {"product": "Widget A", "stock": 150},
        {"product": "Widget B", "stock": 0},
    ]


def build_report(users: list, orders: list, inventory: list) -> dict:
    """Merge all data into a summary report."""
    total_revenue = sum(o["total"] for o in orders)
    out_of_stock = [i["product"] for i in inventory if i["stock"] == 0]
    return {
        "total_users": len(users),
        "total_orders": len(orders),
        "revenue": total_revenue,
        "out_of_stock": out_of_stock,
    }


pipeline = (
    Workflow(name="dashboard_report")
    .step(fetch_users)
    .step(fetch_orders)
    .step(fetch_inventory)
    .step(build_report,
        users=lambda: get_run_context().step_span("fetch_users").output,
        orders=lambda: get_run_context().step_span("fetch_orders").output,
        inventory=lambda: get_run_context().step_span("fetch_inventory").output,
    )
)

How It Works

fetch_users    ─┐
fetch_orders   ─┼─→ build_report
fetch_inventory─┘
  1. fetch_users, fetch_orders, and fetch_inventory run in parallel — no dependencies between them
  2. build_report waits for all three to complete, then merges their outputs

Running

result = await pipeline().collect()
print(result.output)
The output will be:
{
  "total_users": 2,
  "total_orders": 3,
  "revenue": 104.48,
  "out_of_stock": ["Widget B"]
}