Skip to main content
A data processing pipeline that validates input and routes to different handlers based on the result. Only the matching branch executes.

Workflow

pipeline.py
from timbal import Workflow
from timbal.state import get_run_context


def validate_data(data: dict) -> str:
    """Validate data and return status."""
    if not data.get("email"):
        return "invalid"
    if data.get("age", 0) < 18:
        return "minor"
    return "valid"


def process_adult(data: dict) -> dict:
    """Process adult user data."""
    return {
        "status": "processed",
        "user": data["email"],
        "tier": "adult",
    }


def process_minor(data: dict) -> dict:
    """Process minor user data with restrictions."""
    return {
        "status": "processed",
        "user": data["email"],
        "tier": "minor",
        "restricted": True,
    }


def log_invalid(data: dict) -> dict:
    """Log invalid data."""
    return {
        "status": "rejected",
        "reason": "missing email",
    }


pipeline = (
    Workflow(name="user_processor")
    .step(validate_data, data={"email": "user@example.com", "age": 25})
    .step(process_adult,
        data=lambda: get_run_context().step_span("validate_data").input.get("data"),
        when=lambda: get_run_context().step_span("validate_data").output == "valid")
    .step(process_minor,
        data=lambda: get_run_context().step_span("validate_data").input.get("data"),
        when=lambda: get_run_context().step_span("validate_data").output == "minor")
    .step(log_invalid,
        data=lambda: get_run_context().step_span("validate_data").input.get("data"),
        when=lambda: get_run_context().step_span("validate_data").output == "invalid")
)

How It Works

validate_data ──┬─→ process_adult (if "valid")
                 ├─→ process_minor (if "minor")
                 └─→ log_invalid (if "invalid")
  1. validate_data — checks email and age, returns “valid”, “minor”, or “invalid”
  2. process_adult, process_minor, or log_invalid — only the matching branch runs (when condition)

Running

result = await pipeline().collect()
print(result.output)
The output will be:
{
  "status": "processed",
  "user": "user@example.com",
  "tier": "adult"
}