Skip to main content
Any Timbal Runnable (Tool, Agent, or Workflow step) can require human approval before it runs. When a gate fires, the run cancels with a structured ApprovalEvent; you resume by calling the runnable again with approval_decisions={...}. Use approvals for irreversible actions: refunds, deploys, account deletions, outbound emails, anything that costs money or moves data.

Quick Start

Mark a tool as approval-required, stream events, and resume with a decision keyed by approval_id.
from timbal import Agent, Tool
from timbal.types.events import ApprovalEvent


def refund_customer(amount: int) -> str:
    return f"refunded ${amount}"


refund = Tool(
    handler=refund_customer,
    requires_approval=lambda amount: amount > 100,
    approval_prompt=lambda amount: f"Approve refunding ${amount}?",
)

agent = Agent(
    name="support_agent",
    model="openai/gpt-5",
    tools=[refund],
)

approval_id = None
async for event in agent(prompt="Refund $250"):
    if isinstance(event, ApprovalEvent):
        approval_id = event.approval_id

result = await agent(
    prompt="Refund $250",
    approval_decisions={approval_id: True},
).collect()
When a gate fires, the run ends with status.code == "cancelled" and status.reason == "approval_required". The ApprovalEvent carries:
  • approval_id — stable id used to resolve the gate
  • runnable_path, runnable_name, runnable_type — what was about to execute
  • input — validated handler input (after redaction, if configured)
  • prompt, description — human-facing strings
  • t0 — Unix-ms timestamp of when approval was requested (useful for SLA timers)

Configuring Approval Gates

requires_approval accepts True, False, or a callable that receives the same kwargs as the handler and returns bool. approval_prompt and approval_description accept strings or callables and surface in the ApprovalEvent so the human reviewer has context.
high_risk_deploy = Tool(
    handler=deploy_handler,
    requires_approval=lambda env, **_: env == "production",
    approval_prompt=lambda env, **_: f"Deploy to {env}?",
    approval_description="Deploys ship traffic to the listed environment.",
)
If requires_approval or approval_prompt raises, the runnable does not silently approve nor execute. It ends with status.code == "error" and status.reason == "approval_policy_error" — distinct from handler errors so dashboards can surface policy bugs separately.

Resolutions: Approve, Deny, Audit

Pass either a bare bool (True/False) or an ApprovalResolution for richer audit fields:
from timbal.types.approval import ApprovalResolution

result = await agent(
    prompt="Refund $250",
    approval_decisions={
        approval_id: ApprovalResolution(
            approved=False,
            reason="Refund exceeds policy limit.",
            approver_id="user_42",
            comment="Customer is outside the refund window.",
        )
    },
).collect()
Audit fields are first-class (not free-form metadata) and persist under span.metadata["approval"]["resolution"]:
  • approver_id — who decided
  • comment — free-form reasoning
  • decided_at — Unix-ms timestamp; defaults to construction time. Pass an explicit value if you replay decisions and need idempotency
  • metadata — org-specific extras

Tool denial vs Agent denial

Behavior differs based on who initiated the call:
  • Direct tool call — denial returns status.code == "cancelled" and status.reason == "approval_denied". The handler does not run.
  • Tool called by an Agent — denial is converted into a ToolResultContent so the model can see “this tool was denied” and choose another path (apologize, escalate, try an alternative). The agent does not crash.

Time-Limited Decisions

Bound decisions with expires_at (Unix-ms). Expired resolutions are ignored at gate time and the gate emits a fresh ApprovalEvent with metadata["approval"]["expired"] == True:
import time
from timbal.types.approval import ApprovalResolution

decision = ApprovalResolution(
    approved=True,
    approver_id="user_42",
    comment="Approved from the support console.",
    expires_at=int(time.time() * 1000) + 60_000,  # valid for 60s
)
This is useful when an operator stamps a decision in a UI but a worker doesn’t pick it up immediately — stale decisions force a fresh re-review instead of silently going through.

Redacting Approval Input

Approval input is shown to humans and written to traces. If a gated runnable receives secrets or PII, redact the public approval snapshot. The simple form lists keys to mask with "***":
rotate_key = Tool(
    handler=rotate_key_impl,
    requires_approval=True,
    approval_prompt="Rotate this API key?",
    approval_redact_keys=["api_key", "password"],
)
For custom logic, use approval_redactor. It receives a copy of the validated input dict and returns the public snapshot:
rotate_key = Tool(
    handler=rotate_key_impl,
    requires_approval=True,
    approval_redactor=lambda input: {
        **input,
        "api_key": "***",
        "customer_email": input["customer_email"].split("@")[0] + "@***",
    },
)
The redacted snapshot is used everywhere the input would otherwise be visible: ApprovalEvent.input, span.input while the gate is pending, span.metadata["approval"]["input"], OutputEvent.metadata["pending_approvals"], and any exporter (OTel, Langfuse, etc.). The handler still receives the original unredacted input when the approval is resumed.
A redactor that raises or returns a non-dict falls back to a placeholder so secrets never leak through a buggy redactor.

approval_id Semantics

The approval_id is derived from (runnable_path, validated_input). The same path + input shares one decision, so a single resolution survives retries of the same call (stream resumes, transient failures, agent loops re-asking for the same tool). Treat the id as opaque — the derivation is an internal contract and may change across SDK versions, so don’t persist ids across deploys. For irreversible operations (money movement, destructive deletes) where every call must require a fresh decision, include a unique value in the input so each call derives a distinct approval_id:
from uuid import uuid4

result = await agent(
    prompt="Wire $5,000 to vendor X",
    # The agent passes idempotency_key through to the tool; each call
    # gets a unique key, so each call gets its own approval_id.
)
Or expose an idempotency_key: str parameter on the tool with a default_params={"idempotency_key": lambda: str(uuid4())} — Timbal evaluates the callable per-invocation.

Approvals in Workflows

Workflow steps follow the same rules as tools — requires_approval is a Runnable config, so wrap the function in a Tool (or use any Runnable) before adding it as a step. When a gated step fires, the workflow run cancels with approval_required and emits one ApprovalEvent per pending gate. Independent gates fire in parallel — you don’t need to ping-pong one approval at a time.
from timbal import Tool, Workflow

deploy_prod = Tool(
    name="deploy_prod",
    handler=deploy_prod_impl,
    requires_approval=True,
    approval_prompt="Promote to prod?",
)

workflow = (
    Workflow(name="release_pipeline")
    .step(deploy_staging)
    .step(deploy_prod)
    .step(announce, depends_on=["deploy_prod"])
)
When you resume with approval_decisions={...}, only the steps you decided on advance. Other pending gates remain pending and re-emit on the next call. This means you can approve a subset, observe what runs, and decide on the rest later.

Durable Resume Across Processes

The default InMemoryTracingProvider only resumes within the same Python process. For “approve in a UI now, resume in a worker later” workflows, switch to a durable provider:
from pathlib import Path
from timbal import Agent
from timbal.state.tracing.providers import JsonlTracingProvider

provider = JsonlTracingProvider.configured(_path=Path("traces.jsonl"))

agent = Agent(
    name="support_agent",
    model="openai/gpt-5",
    tools=[refund],
    tracing_provider=provider,
)
JsonlTracingProvider writes one record per run and uses a sidecar lock file (traces.jsonl.approval_claims.json + .lock via fcntl) for cross-process approval claims. Good for local dev and single-host deployments. Not recommended for high-throughput production — _store() rewrites the file on each run.
To resume from a different process, pass the original run id as parent_id:
result = await agent(
    prompt="Refund $250",
    parent_id=paused_run_id,
    approval_decisions={approval_id: True},
).collect()
Timbal loads the parent trace (input messages, pending gates, prior tool calls) before executing the resolution, so the runnable sees exactly the state it was paused at.

Duplicate Worker Protection

When multiple workers consume the same approval queue, two of them might race to resume the same (parent_id, approval_id). Timbal claims the pair before executing the resolution. The first claimer wins; later duplicates stop before handler execution with status.reason == "approval_already_claimed".
result = await agent(
    prompt="Refund $250",
    parent_id=paused_run_id,
    approval_decisions={approval_id: True},
).collect()

if result.status.reason == "approval_already_claimed":
    # Another worker already executed this approval. Safe to no-op.
    return
This protection is implemented by JsonlTracingProvider and SqliteTracingProvider out of the box. Custom providers must override claim_approval(parent_id, approval_id, run_id) to get the same durable-lock behavior — the base class default is a no-op.

Enumerating Pending Approvals

When a run cancels with approval_required and the runnable has multiple concurrent tool calls, each gate emits its own ApprovalEvent. There are two ergonomic ways to enumerate them. During the stream, capture every ApprovalEvent:
pending = []
async for event in agent(prompt="..."):
    if isinstance(event, ApprovalEvent):
        pending.append(event)
After .collect(), read OutputEvent.metadata["pending_approvals"]:
result = await agent(prompt="...").collect()

if result.status.reason == "approval_required":
    for entry in result.metadata["pending_approvals"]:
        print(entry["approval_id"], entry["runnable_path"], entry["prompt"], entry["input"])
The collector attaches the full list there because result.status only references the first gate. Resume by passing all decisions you want to settle in one call:
decisions = {entry["approval_id"]: True for entry in result.metadata["pending_approvals"]}
result = await agent(prompt="...", parent_id=paused_run_id, approval_decisions=decisions).collect()
For server-side traversal of a loaded trace (e.g. building a review queue from durable storage), RunContext.pending_approvals() walks RunContext._trace directly. It tolerates both live RunStatus and dict-after-reload shapes, so it works against in-memory, JSONL, SQLite, and platform traces. Returned entries use the redacted approval input snapshot — never the raw secrets.

Observability

Status reasons

When the run cancels, OutputEvent.status.reason carries one of:
  • approval_required — a gate emitted an ApprovalEvent and is waiting on a decision
  • approval_denied — a denying resolution was consumed (direct call only; agents convert this to a tool result)
  • approval_already_claimed — durable claim said another worker already resumed this gate
  • approval_policy_error — a requires_approval / approval_prompt callable raised

Usage counters

OutputEvent.usage records approval-lifecycle counters so you can plot them in dashboards:
  • approvals:required — a gate emitted an ApprovalEvent
  • approvals:approved — a valid approved resolution was consumed
  • approvals:denied — a valid denied resolution was consumed
  • approvals:expired — an expired resolution was ignored and the gate re-emitted
These propagate through the usage merge tree just like token counts, so a parent agent run aggregates approval counts from every nested tool/workflow gate.

Common Patterns

  1. Configure a durable provider (JsonlTracingProvider / SqliteTracingProvider / PlatformTracingProvider).
  2. UI calls agent(prompt=...), captures ApprovalEvent (or polls the trace for pending_approvals()), shows reviewer the prompt + redacted input.
  3. Reviewer clicks Approve/Deny. UI persists (approval_id, ApprovalResolution) to a queue.
  4. Worker pulls the message and calls agent(prompt=..., parent_id=run_id, approval_decisions={approval_id: resolution}).
  5. If result.status.reason == "approval_already_claimed", no-op. Otherwise, the handler executed exactly once.
The agent may call multiple gated tools in parallel. Each gate emits its own ApprovalEvent and the run cancels once they all settle. Collect all approval_ids, present them as a checklist, and resume with the full dict in one call.
The default approval_id is stable across retries of the same (path, input). To require a fresh decision per invocation, include a per-call unique value in the input — typically an idempotency_key=str(uuid4()) — so each call derives a distinct id.
Use approval_redact_keys=["api_key", "password"] for the simple case. Use approval_redactor=lambda input: {...} for partial masking (e.g. mask the local-part of an email but keep the domain). The handler still receives the unredacted input.

See Also