Skip to main content
Workflows orchestrate steps in a DAG: you define what runs, Timbal infers dependencies and runs independent steps in parallel. Use them when the path is predictable (ETL, gated deploys, fan-out/fan-in, LLM routing) rather than leaving every decision to the model.

Workflow vs Agent

WorkflowAgent
ControlYou define steps and orderThe model decides what to do
Best forPipelines, batch jobs, deterministic routingOpen-ended reasoning, tool picking
ParallelismBuilt-in across independent stepsSequential tool loop
CompositionNest workflows, mix agents + functionsTools and sub-agents
They share the same interface: call a Runnable to get an event stream, or chain .collect() for the final OutputEvent. A common pattern is a workflow that routes to agents (classify → specialist agent). See Agents & LLMs.

Reading guide

  1. Control flow — parallel execution, step_span(), depends_on, wiring data between steps
  2. Branching — conditional steps with when
  3. Errors & failure — what happens when a step fails or is skipped
  4. Agents & LLMs — agents as steps, LLM routing patterns
  5. Human in the loop — approval gates on workflow steps
  6. Examples below — copy-paste pipelines for common shapes

Quickstart

import asyncio

from timbal import Workflow
from timbal.state import get_run_context


def celsius_to_fahrenheit(celsius: float) -> float:
    return (celsius * 9 / 5) + 32


def format_result(temperature: float) -> str:
    return f"Temperature: {temperature}°F"


workflow = (
    Workflow(name="temperature_converter")
    .step(celsius_to_fahrenheit, celsius=35)
    .step(
        format_result,
        temperature=lambda: get_run_context().step_span("celsius_to_fahrenheit").output,
    )
)


async def main():
    result = await workflow().collect()
    print(result.output)


asyncio.run(main())
Functions used as steps must accept and return Pydantic-serializable types (str, int, float, bool, dict, list, BaseModel). Custom classes that aren’t Pydantic models cannot be passed between steps.

Adding steps

Use .step() to add a Runnable (function, Tool, Agent, or nested Workflow):
from timbal import Tool, Workflow

# Same handler twice? Give each Tool a unique name.
threshold_high = Tool(name="threshold_high", handler=check_threshold)
threshold_low = Tool(name="threshold_low", handler=check_threshold)

workflow = (
    Workflow(name="monitoring")
    .step(threshold_high, value=80, limit=100)
    .step(threshold_low, value=80, limit=50)
)
Each step name must be unique within the workflow. Timbal detects cycles when you link steps and raises if the graph would loop.

Workflow inputs

Keyword arguments passed to the workflow are forwarded to steps. Parameters without a default on a step become workflow-level inputs (merged into the workflow’s params schema):
workflow = (
    Workflow(name="scraper")
    .step(fetch)          # fetch(url: str) → url is a workflow input
    .step(process, raw=lambda: get_run_context().step_span("fetch").output)
)

result = await workflow(url="https://example.com").collect()
Fixed values passed in .step(name, key=value) are defaults for that step only and are not exposed as workflow inputs.

Output

The workflow returns the last executed step’s output. If you need multiple step results, add a final merge step:
workflow = (
    Workflow(name="pipeline")
    .step(fetch_users)
    .step(fetch_orders)
    .step(
        build_report,
        users=lambda: get_run_context().step_span("fetch_users").output,
        orders=lambda: get_run_context().step_span("fetch_orders").output,
    )
)
# build_report's return value is the workflow output

Running

# Collect final OutputEvent
result = await workflow(url="https://example.com").collect()
print(result.output)
print(result.status.code)   # "success" | "error" | "cancelled"
print(result.usage)

# Or stream events from every step
async for event in workflow(url="https://example.com"):
    print(event)

Composition

Nest workflows as steps. The inner workflow’s final output becomes that step’s output:
data_pipeline = (
    Workflow(name="data_pipeline")
    .step(fetch_data, source="api")
    .step(clean_data, raw=lambda: get_run_context().step_span("fetch_data").output)
)

report_pipeline = (
    Workflow(name="report_pipeline")
    .step(data_pipeline)
    .step(generate_report, data=lambda: get_run_context().step_span("data_pipeline").output)
)
See Workflow Composition for a full example.

Examples

Sequential steps

Chain steps with data passing between them

Parallel fan-out

Fetch from multiple sources concurrently, then merge

Conditional routing

Route to different handlers based on validation

Workflow composition

Nest an inner workflow as a step