Skip to main content

What are Hooks?

Hooks allow workflows to suspend execution and wait for external events. Unlike sleep() which resumes after a time duration, hooks wait for an external system or user to provide data before continuing.
from pydantic import BaseModel
from pyworkflow import define_hook, workflow

class ApprovalPayload(BaseModel):
    approved: bool
    reviewer: str
    comments: str | None = None

# Define a typed hook with Pydantic validation
approval_hook = define_hook("manager_approval", ApprovalPayload)

@workflow()
async def approval_workflow(order_id: str):
    order = await prepare_order(order_id)

    # Workflow suspends here - waits for external approval
    approval: ApprovalPayload = await approval_hook(timeout="7d")

    if approval.approved:
        return await fulfill_order(order)
    else:
        return await cancel_order(order, approval.comments)

How It Works

Workflow Execution


┌───────────────┐
│ Execute Steps │
└───────┬───────┘


┌───────────────────┐
│ await hook(...)   │
└───────┬───────────┘

        ├─── 1. Generate token (run_id:hook_id)

        ├─── 2. Record hook_created event

        ├─── 3. Store hook with schema (for CLI)

        ├─── 4. Call on_created callback with token

        ├─── 5. Raise SuspensionSignal

        └─── 6. Worker is freed

                  │  ... waiting for external event ...


        ┌─────────────────────┐
        │ External system     │
        │ calls resume_hook() │
        │ with token + payload│
        └────────┬────────────┘

                 ├─── 7. Validate payload (if typed)

                 ├─── 8. Record hook_received event

                 └─── 9. Schedule workflow resumption


                 ┌───────────────┐
                 │ Replay events │
                 │ Resume work   │
                 └───────────────┘

Choosing a Hook Type

PyWorkflow offers two ways to define hooks:
FeatureTypedHook (Recommended)Simple Hook
Type safetyFull Pydantic validationDict with any keys
CLI promptsInteractive field-by-fieldJSON payload only
IDE supportAutocomplete, type hintsNone
Schema storedYes (enables CLI features)No
Best forProduction workflowsQuick prototypes
We recommend TypedHook for all production workflows. The Pydantic schema enables CLI interactive prompts, payload validation, and better IDE support.
TypedHook combines hook suspension with Pydantic validation for type-safe payloads.

Defining a Typed Hook

from pydantic import BaseModel
from typing import Optional
from pyworkflow import define_hook

# 1. Define the payload schema
class ApprovalPayload(BaseModel):
    approved: bool
    reviewer: str
    comments: Optional[str] = None

# 2. Create the typed hook
approval_hook = define_hook("manager_approval", ApprovalPayload)

Using in a Workflow

from pyworkflow import workflow

@workflow()
async def order_approval(order_id: str):
    order = await prepare_order(order_id)

    async def on_hook_created(token: str):
        print(f"Approval needed! Token: {token}")
        # Send token to external system, email, Slack, etc.

    # Wait for approval - returns validated ApprovalPayload
    approval: ApprovalPayload = await approval_hook(
        timeout="7d",
        on_created=on_hook_created,
    )

    # Type-safe access to payload fields
    if approval.approved:
        print(f"Approved by {approval.reviewer}")
        return await fulfill_order(order)
    else:
        return await cancel_order(order, approval.comments or "Rejected")

CLI Interactive Resume

When you run pyworkflow hooks resume, the CLI uses the stored Pydantic schema to prompt for each field:
$ pyworkflow hooks resume

? Select a pending hook to resume:
  > manager_approval (run_abc123:hook_manager_approval_1)

? approved (bool): yes
? reviewer (str): admin@example.com
? comments (str, optional): Looks good!

Hook resumed successfully.

Simple Hook

For quick prototyping or when you don’t need typed payloads, use the hook() function directly.
from pyworkflow import hook, workflow

@workflow()
async def simple_approval(order_id: str):
    order = await prepare_order(order_id)

    async def on_hook_created(token: str):
        print(f"Resume with: pyworkflow hooks resume {token}")

    # Wait for any payload (untyped dict)
    approval = await hook(
        "approval",
        timeout="24h",
        on_created=on_hook_created,
    )

    # Payload is a dict - no type safety
    if approval.get("approved"):
        return await fulfill_order(order)
    else:
        return await cancel_order(order, approval.get("reason", "Rejected"))
Simple hooks don’t enable CLI interactive prompts. You must provide the full JSON payload when resuming.

Resuming Hooks

Using the CLI

# List all pending hooks
pyworkflow hooks list --status pending

# View hook details
pyworkflow hooks info <token>

# Resume interactively (TypedHook only - prompts for fields)
pyworkflow hooks resume

# Resume with explicit payload
pyworkflow hooks resume <token> --payload '{"approved": true, "reviewer": "admin"}'

# Resume with payload from file
pyworkflow hooks resume <token> --payload-file approval.json

Programmatically

from pyworkflow import resume_hook

# Resume a hook with payload
result = await resume_hook(
    token="run_abc123:hook_manager_approval_1",
    payload={"approved": True, "reviewer": "admin@example.com"},
)

if result.success:
    print(f"Hook resumed, workflow continuing")
else:
    print(f"Failed: {result.error}")

Token Format

Tokens are composite identifiers in the format run_id:hook_id:
run_abc123:hook_manager_approval_1
└────┬────┘ └─────────┬──────────┘
  run_id         hook_id
This self-describing format allows the system to route the resumption to the correct workflow run.

Configuration Options

OptionTypeDescription
timeoutstr | intMaximum wait time before hook expires. String format ("24h", "7d") or seconds.
on_createdCallable[[str], Awaitable[None]]Async callback invoked with token when hook is created.
payload_schemaType[BaseModel]Pydantic model for payload validation (used by simple hook() only).

Timeout Examples

# String format (recommended)
await approval_hook(timeout="24h")    # 24 hours
await approval_hook(timeout="7d")     # 7 days
await approval_hook(timeout="1h30m")  # 1 hour 30 minutes

# Integer (seconds)
await approval_hook(timeout=3600)     # 1 hour

# No timeout (waits indefinitely)
await approval_hook()

on_created Callback

The on_created callback is called with the hook token when the hook is created. Use it to notify external systems:
async def notify_approver(token: str):
    await send_slack_message(
        channel="#approvals",
        text=f"Approval needed! Resume: `pyworkflow hooks resume {token}`"
    )
    await send_email(
        to="manager@example.com",
        subject="Approval Required",
        body=f"Click to approve: https://app.example.com/approve?token={token}"
    )

approval = await approval_hook(
    timeout="7d",
    on_created=notify_approver,
)

Use Cases

Human Approval Workflows

@workflow()
async def expense_approval(expense_id: str, amount: float):
    expense = await prepare_expense(expense_id)

    if amount > 1000:
        # High-value expenses need manager approval
        approval = await manager_approval_hook(timeout="48h")
        if not approval.approved:
            return await reject_expense(expense, approval.reason)

    return await process_expense(expense)

Multi-Level Approval

@workflow()
async def contract_approval(contract_id: str):
    contract = await prepare_contract(contract_id)

    # Level 1: Manager approval
    manager = await manager_hook(timeout="24h")
    if not manager.approved:
        return await reject_contract(contract, "Manager rejected")

    # Level 2: Legal review
    legal = await legal_hook(timeout="72h")
    if not legal.approved:
        return await reject_contract(contract, "Legal rejected")

    # Level 3: Executive sign-off
    executive = await executive_hook(timeout="7d")
    if not executive.approved:
        return await reject_contract(contract, "Executive rejected")

    return await execute_contract(contract)

Webhook Integration

@workflow()
async def payment_processing(order_id: str):
    order = await create_order(order_id)

    async def setup_webhook(token: str):
        # Register token with payment provider
        await payment_provider.register_webhook(
            callback_url=f"https://api.example.com/hooks/{token}",
        )

    # Wait for payment confirmation from external provider
    payment = await payment_confirmation_hook(
        timeout="1h",
        on_created=setup_webhook,
    )

    if payment.status == "completed":
        return await fulfill_order(order)
    else:
        return await cancel_order(order, payment.error)

User Confirmation

@workflow()
async def account_deletion(user_id: str):
    user = await get_user(user_id)

    async def send_confirmation_email(token: str):
        await send_email(
            to=user.email,
            subject="Confirm Account Deletion",
            body=f"Click to confirm: https://app.example.com/confirm?token={token}"
        )

    # Wait for user to confirm deletion
    confirmation = await confirmation_hook(
        timeout="24h",
        on_created=send_confirmation_email,
    )

    if confirmation.confirmed:
        return await delete_user(user_id)
    else:
        return {"status": "cancelled", "user_id": user_id}

Best Practices

TypedHook provides validation, IDE support, and CLI interactive prompts:
# Good: Typed hook with Pydantic
class ApprovalPayload(BaseModel):
    approved: bool
    reviewer: str

approval_hook = define_hook("approval", ApprovalPayload)

# Avoid in production: Untyped hook
approval = await hook("approval")
Always set timeouts to prevent workflows from waiting indefinitely:
# Good: Set reasonable timeout
await approval_hook(timeout="7d")

# Avoid: No timeout (waits forever)
await approval_hook()
Handle expiration gracefully in your workflow logic.
Hook names should clearly indicate their purpose:
# Good: Clear purpose
manager_approval_hook = define_hook("manager_approval", ApprovalPayload)
payment_confirmation_hook = define_hook("payment_confirmation", PaymentPayload)

# Avoid: Vague names
hook1 = define_hook("hook1", Payload)
Use the on_created callback to trigger notifications:
async def notify_systems(token: str):
    await send_slack_notification(token)
    await send_email_notification(token)
    await register_webhook(token)

approval = await approval_hook(
    timeout="24h",
    on_created=notify_systems,
)
Catch and handle hook-related exceptions:
from pyworkflow import HookExpiredError, HookNotFoundError

try:
    result = await resume_hook(token, payload)
except HookExpiredError:
    print("Hook has expired")
except HookNotFoundError:
    print("Hook not found")

Testing Hooks

Use MockContext to test workflows with hooks without actual suspension:
import asyncio
from pyworkflow import MockContext, set_context

def test_approval_workflow():
    # Create mock context with predefined hook responses
    ctx = MockContext(
        run_id="test_run",
        workflow_name="approval_workflow",
        mock_hooks={
            "manager_approval": {
                "approved": True,
                "reviewer": "test@example.com",
                "comments": "Approved in test",
            }
        }
    )
    set_context(ctx)

    try:
        # Run workflow - hook returns mock response immediately
        result = asyncio.run(approval_workflow("order-123"))

        # Verify workflow completed correctly
        assert result["status"] == "fulfilled"
        assert ctx.hook_count == 1
    finally:
        set_context(None)

Testing Multiple Hooks

def test_multi_approval_workflow():
    ctx = MockContext(
        run_id="test_run",
        workflow_name="multi_approval",
        mock_hooks={
            "manager_approval": {"approved": True, "approver": "manager"},
            "finance_approval": {"approved": True, "approver": "finance"},
        }
    )
    set_context(ctx)

    try:
        result = asyncio.run(multi_approval_workflow("order-123"))
        assert result["status"] == "fulfilled"
        assert ctx.hook_count == 2
    finally:
        set_context(None)

Hook Events

Hooks generate events that are stored in the event log:
Event TypeWhenData
hook.createdHook is awaitedhook_id, token, name, expires_at, schema
hook.receivedHook is resumedhook_id, payload
hook.expiredTimeout reachedhook_id
hook.disposedHook cleaned uphook_id
View hook events for a run:
pyworkflow runs logs <run_id> --type hook

Next Steps