Skip to main content

Orchestrating AI Workflows

Design, connect, and control multi-step AI pipelines using Workflows—Timbal's flexible workflow engine.


What Are Workflows?

Workflows are programmable execution pipelines that orchestrate step-by-step processing with explicit control flow.

from timbal.core import Workflow
workflow = Workflow(name="my_workflow")

This is the initial step to create a workflow. Once created, you can build your workflow by adding components as building blocks.


Building Blocks of a Workflow

Steps

Steps are the core units of work.

Each step can be:

  • a function
  • a Tool
  • another workflow

Steps process data, perform actions, and pass results onward.

Links

Links define execution dependencies between steps.

Links can be:

  • explicit (manual)
  • implicit (automatic via data references)

Links ensure proper execution order and create a DAG.

Workflow(name="my_workflow")
.step(step_1)
.step(step_2)
.link("step_1", "step_2") # Names of the tools (or function names if no tool name set)
)

DAG-Based Execution

Workflows form a Directed Acyclic Graph (DAG) where:

  • Steps can run in parallel when dependencies allow
  • No circular dependencies (prevents infinite loops)
  • Automatic dependency resolution and execution ordering

Adding Steps to Your Workflow

Workflows use .step() method to add steps. Each step can be:

  • Functions: Direct function references
  • Tools: Tool objects with handlers
  • Dictionaries: Tool configurations
  • Other Workflows: Nested workflow components

Adding Steps with Parameters

You can pass fixed parameters to steps using keyword arguments:

Workflow(name="temperature_alert")
.step(celsius_to_fahrenheit, celsius=35)
.step(check_threshold, threshold=lambda: 6)
)

Connecting Steps

Use get_run_context().get_data("step_name.output") to access outputs from previous steps:

workflow = (Workflow(name="temperature_alert")
.step(step1, celsius=35)
.step(step2, temperature=lambda: get_run_context().get_data("step1.output"))
)

In the above example, you don't need .link() because step2 uses step1's output. When a step depends on another step's data, they run sequentially (implicit linking).

To force sequential execution, use .link():

def fetch_data(): # Takes 2 seconds
time.sleep(2)
return "data"
def process_data(): # Takes 3 seconds
time.sleep(3)
return "processed"
workflow = (Workflow(name="sequential_flow")
.step(fetch_data) # Starts at 0s, finishes at 2s
.step(process_data) # Starts at 2s, finishes at 5s
.link("fetch_data", "process_data")
)
# Total time: 5 seconds (2 + 3)

Without .link(), steps run in parallel:

def send_email(): # Takes 2 seconds
time.sleep(2)
return "email sent"
def update_database(): # Takes 3 seconds
time.sleep(3)
return "db updated"
workflow = (Workflow(name="parallel_flow")
.step(send_email) # Starts at 0s, finishes at 2s
.step(update_database) # Starts at 0s, finishes at 3s
)
# Total time: 3 seconds (max of 2 and 3)

Integrating LLMs

You can add LLMs as steps. Timbal provides llm_router function that set as a Tool can work as an step.

from timbal.core import Tool
from timbal.core.workflow import Workflow
from timbal.core.llm_router import llm_router
from timbal.state import get_run_context
from timbal.types.message import Message
def get_email() -> str:
return "Hi team, let's meet tomorrow at 10am to discuss the project. Best, Alice"
openai_llm = Tool(
name="openai_llm",
handler=llm_router,
default_params={
"model": "openai/gpt-4o-mini",
"system_prompt": "You are a helpful assistant that summarizes emails concisely.",
}
)
workflow = (
Workflow(name="email_summarizer")
.step(get_email)
.step(openai_llm, messages=lambda: [Message.validate(f"Summarize this email: {get_run_context().get_data('get_email.output')}")])
.link("get_email", "openai_llm")
)

Default Parameters

Look in the above example that the parameter of the function openai_llm comes from default_params ('model' and 'system_prompt') and runtime parameters (messages).

Default parameters in Timbal allow you to set predefined values that are automatically injected into your runnable components, providing flexibility and reducing boilerplate code.

Default parameters are defined when creating a runnable and are merged with runtime parameters. Runtime parameters always override default parameters.


Running Your Workflow

Once your workflow is defined, you can execute it in two main ways:

Get the final output:

result = await workflow().collect()
print(result.output)

Stream events as they happen:

async for event in workflow():
print(event)

If a function in your flow needs a value per run (e.g., x), pass it when you call the workflow:

  • Inputs are routed only to steps that declare those parameters
  • Runtime inputs override step defaults
  • The same input name can feed multiple steps unless a step overrides it
from timbal.core import Workflow
def multiply(x: int) -> int:
return x * 2
workflow = Workflow(name="simple_flow").step(multiply)
# Run with a per-run input
result = await workflow(x=1).collect() # x is routed to multiply
print(result.output) # 2
# Or stream events while using the same input
async for event in workflow(x=3):
print(event) # Final output will be 6

Key Features

  • Parallel Execution: Independent steps run concurrently
  • Error Isolation: Failed steps skip dependents, others continue
  • Type Safety: Automatic parameter validation via Pydantic
  • Composition: Workflows can contain other workflows

For more, see the Advanced Workflow Concepts, and Examples.