Skip to main content

What Are Workflows?

Workflows are programmable execution pipelines that orchestrate step-by-step processing with explicit control flow. The following line creates an empty workflow, ready for steps to be added:
from timbal import Workflow

workflow = Workflow(name="my_workflow")

Building Blocks of a Workflow: Steps

Steps are the core units of work, which can process data, perform actions and pass results onward.

Adding Steps to the Workflow

Use .step() to add steps to workflows. Any Runnable is valid to create a step.
def celsius_to_fahrenheit(celsius: float) -> float:
    return (celsius * 9/5) + 32

def check_threshold(temperature: float, threshold: float) -> str:
    return "Alert!" if temperature > threshold else "Normal"
    
workflow = (Workflow(name="temperature_alert")
    .step(celsius_to_fahrenheit, celsius=35)
    .step(check_threshold, temperature=80, threshold=lambda: 85)
)

Step Names and Reusing Functions

Each step in a workflow must have a unique name. Like all Runnables, steps are identified by their names, which must be distinct within the workflow. To use the same function multiple times in a workflow, wrap it in a new Tool with a distinct name for each usage:
# Create a Tool to reuse the function
threshold_checker_tool = Tool(
    name="threshold_checker",
    handler=check_threshold
)

workflow = (Workflow(name="temperature_monitoring", temperature=80)
    .step(celsius_to_fahrenheit, celsius=35)
    .step(check_threshold, threshold=lambda: 85)
    .step(threshold_checker_tool, threshold=lambda: 100)
)

Running the Workflow

The workflow’s final output is automatically determined by the last executed step in the dependency graph. To run the workflow and receive directly the final output, use the collect() method:
result = await workflow().collect()
print(result.output)  # Output from the final executed step
However, you can also stream all events as they happen:
async for event in workflow():
    print(event)
The input and output from each step is stored in its Run Context, accessible via:
  • .input: Dictionary of parameters passed to the step
  • .output: Value(s) returned by the step. Can be a single value, dictionary, array, or custom class.
You can also store additional custom variables:
from timbal.state import get_run_context

async def process_user_data(user_id: str):
    # New variable saved in the step context:
    get_run_context().current_span().user_status = "active"
    return f"Processed user: {user_id}"

Key Features

  • Composition: Workflows can contain other workflows
  • Parallel Execution: Independent steps run concurrently
  • Type Safety: Automatic parameter validation via Pydantic
  • Error Isolation: Failed steps skip dependents, others continue
Check Control Flow and Branching for different step execution behaviors.