Build multi-step AI pipelines with explicit control flow
Workflows orchestrate steps in a DAG: you define what runs, Timbal infers dependencies and runs independent steps in parallel. Use them when the path is predictable (ETL, gated deploys, fan-out/fan-in, LLM routing) rather than leaving every decision to the model.
They share the same interface: call a Runnable to get an event stream, or chain .collect() for the final OutputEvent. A common pattern is a workflow that routes to agents (classify → specialist agent). See Agents & LLMs.
Functions used as steps must accept and return Pydantic-serializable types (str, int, float, bool, dict, list, BaseModel). Custom classes that aren’t Pydantic models cannot be passed between steps.
Keyword arguments passed to the workflow are forwarded to steps. Parameters without a default on a step become workflow-level inputs (merged into the workflow’s params schema):
workflow = ( Workflow(name="scraper") .step(fetch) # fetch(url: str) → url is a workflow input .step(process, raw=lambda: get_run_context().step_span("fetch").output))result = await workflow(url="https://example.com").collect()
Fixed values passed in .step(name, key=value) are defaults for that step only and are not exposed as workflow inputs.
# Collect final OutputEventresult = await workflow(url="https://example.com").collect()print(result.output)print(result.status.code) # "success" | "error" | "cancelled"print(result.usage)# Or stream events from every stepasync for event in workflow(url="https://example.com"): print(event)