Skip to main content
Workflows execute steps based on their dependencies. By understanding how Timbal resolves these dependencies, you can build efficient pipelines that run steps in parallel when possible and sequentially when needed.

Parallel by Default

When steps have no relationship between them, they run concurrently. Timbal doesn’t wait for one to finish before starting the next:
async def fetch_users():
    return ["Alice", "Bob"]

async def fetch_orders():
    return [{"id": 1}, {"id": 2}]

workflow = (
    Workflow(name="data_loader")
    .step(fetch_users)
    .step(fetch_orders)
)
# fetch_users and fetch_orders run in parallel

Context Access

The input and output from each step is stored in its Run Context, accessible via get_run_context(). Each step exposes two built-in variables:
  • .input: Contains a dictionary of all the parameters passed to the step. They are accessed through their name.
  • .output: Contains the value(s) returned by the step. Can be a single value, dictionary, array, or custom class.
Steps can access data from their own context, the parent workflow, or any sibling step:
MethodDescription
current_span()Current step’s data
parent_span()Parent workflow’s data
step_span("name")Any sibling step’s data

Custom Variables

You can also create your own custom variables to share data between steps:
async def process_user(user_id: str):
    # Store custom data on this step's context
    get_run_context().current_span().user_status = "active"
    return f"Processed user: {user_id}"

async def check_status():
    # Access the custom variable from another step
    status = get_run_context().step_span("process_user").user_status
    return f"User status: {status}"

workflow = (
    Workflow(name="user_pipeline")
    .step(process_user, user_id="user_123")
    .step(check_status)
)
After the workflow runs, each step has the following variables in their context: process_user:
.input["user_id"]  = "user_123"
.output             = "Processed user: user_123"
.user_status        = "active"
check_status:
.input   = {}
.output  = "User status: active"

Data Dependencies

When a step parameter references another step’s output via a lambda, Timbal automatically creates a dependency and enforces sequential execution:
async def merge_results(users: list, orders: list) -> dict:
    return {"users": users, "orders": orders}

workflow = (
    Workflow(name="data_loader")
    .step(fetch_users)
    .step(fetch_orders)
    .step(merge_results,
        users=lambda: get_run_context().step_span("fetch_users").output,
        orders=lambda: get_run_context().step_span("fetch_orders").output,
    )
)
Here:
  • fetch_users and fetch_orders run in parallel (no dependencies between them)
  • merge_results waits for both to complete (its parameters reference their outputs)
The dependency is resolved automatically — you don’t need to declare it explicitly. Even though fetch_users and fetch_orders haven’t executed when merge_results is defined, Timbal detects the step_span() references and knows to wait.

Handling Optional Steps

When a step might be skipped (e.g., due to a when condition), you need to safely handle cases where that step didn’t execute. Use default=None with step_span() to check if a step ran before accessing its output.

Execution Flow

Consider a workflow that validates data and then processes it differently based on the validation result:
1. validate_data (always runs)

   ├─→ If valid: process_valid runs
   └─→ If invalid: process_invalid runs
   
2. merge_results (always runs)
   └─→ Needs to access either process_valid OR process_invalid
       (but not both, since only one will have executed)
The Problem: When merge_results tries to access process_valid or process_invalid, one of them won’t exist because it was skipped. Without default=None, accessing a skipped step would raise an error. The Solution: Use step_span("name", default=None) which returns None if the step was skipped, allowing you to check which step actually ran.

Example

workflow = (
    Workflow(name="pipeline")
    # Step 1: Always runs - validates the input data
    .step(validate_data, data={"email": "user@example.com"})
    
    # Step 2: Only runs if validation passes
    .step(process_valid,
        data=lambda: get_run_context().step_span("validate_data").input.get("data"),
        when=lambda: get_run_context().step_span("validate_data").output.get("valid", False))
    
    # Step 3: Only runs if validation fails
    .step(process_invalid,
        data=lambda: get_run_context().step_span("validate_data").input.get("data"),
        when=lambda: not get_run_context().step_span("validate_data").output.get("valid", False))
    
    # Step 4: Always runs - merges results from validation and processing
    .step(merge_results,
        order=lambda: get_run_context().step_span("validate_data").input.get("data"),
        validated=lambda: get_run_context().step_span("validate_data").output,
        processed=lambda: (
            # Try to get process_valid output (returns None if step was skipped)
            get_run_context().step_span("process_valid", default=None).output
            if get_run_context().step_span("process_valid", default=None) is not None
            # Otherwise, try process_invalid output
            else (
                get_run_context().step_span("process_invalid", default=None).output
                if get_run_context().step_span("process_invalid", default=None) is not None
                else None
            )
        )
    )
)
How it works:
  1. validate_data always executes and returns {"valid": True} or {"valid": False}
  2. Based on the validation result, either process_valid or process_invalid runs (but never both)
  3. merge_results uses step_span("name", default=None) to safely check which processing step ran:
    • If step_span("process_valid", default=None) returns a span (not None), that step ran
    • If it returns None, the step was skipped, so we check process_invalid instead
Key Point: step_span("name", default=None) returns None if the step was skipped, allowing you to handle optional dependencies gracefully. Without default=None, accessing a skipped step would raise an error.

Explicit Dependencies

Use depends_on when you need ordering without a data dependency:
workflow = (
    Workflow(name="pipeline")
    .step(init_database)
    .step(process_data, depends_on=["init_database"])
)
process_data waits for init_database even though it doesn’t use its output.

Summary

  • Steps run in parallel by default
  • Access sibling step data via step_span(), custom variables via current_span()
  • Lambda parameters automatically create dependencies — even if the referenced step hasn’t executed yet
  • Use depends_on for explicit ordering without data dependency