from timbal import Workflow
from timbal.state import get_run_context
def validate_data(data: dict) -> str:
"""Validate data and return status."""
if not data.get("email"):
return "invalid"
if data.get("age", 0) < 18:
return "minor"
return "valid"
def process_adult(data: dict) -> dict:
"""Process adult user data."""
return {
"status": "processed",
"user": data["email"],
"tier": "adult",
}
def process_minor(data: dict) -> dict:
"""Process minor user data with restrictions."""
return {
"status": "processed",
"user": data["email"],
"tier": "minor",
"restricted": True,
}
def log_invalid(data: dict) -> dict:
"""Log invalid data."""
return {
"status": "rejected",
"reason": "missing email",
}
pipeline = (
Workflow(name="user_processor")
.step(validate_data, data={"email": "user@example.com", "age": 25})
.step(process_adult,
data=lambda: get_run_context().step_span("validate_data").input.get("data"),
when=lambda: get_run_context().step_span("validate_data").output == "valid")
.step(process_minor,
data=lambda: get_run_context().step_span("validate_data").input.get("data"),
when=lambda: get_run_context().step_span("validate_data").output == "minor")
.step(log_invalid,
data=lambda: get_run_context().step_span("validate_data").input.get("data"),
when=lambda: get_run_context().step_span("validate_data").output == "invalid")
)