What are Hooks?
Hooks allow workflows to suspend execution and wait for external events . Unlike sleep() which resumes after a time duration, hooks wait for an external system or user to provide data before continuing.
from pydantic import BaseModel
from pyworkflow import define_hook, workflow
class ApprovalPayload ( BaseModel ):
approved: bool
reviewer: str
comments: str | None = None
# Define a typed hook with Pydantic validation
approval_hook = define_hook( "manager_approval" , ApprovalPayload)
@workflow ()
async def approval_workflow ( order_id : str ):
order = await prepare_order(order_id)
# Workflow suspends here - waits for external approval
approval: ApprovalPayload = await approval_hook( timeout = "7d" )
if approval.approved:
return await fulfill_order(order)
else :
return await cancel_order(order, approval.comments)
How It Works
Workflow Execution
│
▼
┌───────────────┐
│ Execute Steps │
└───────┬───────┘
│
▼
┌───────────────────┐
│ await hook(...) │
└───────┬───────────┘
│
├─── 1. Generate token (run_id:hook_id)
│
├─── 2. Record hook_created event
│
├─── 3. Store hook with schema (for CLI)
│
├─── 4. Call on_created callback with token
│
├─── 5. Raise SuspensionSignal
│
└─── 6. Worker is freed
│
│ ... waiting for external event ...
│
▼
┌─────────────────────┐
│ External system │
│ calls resume_hook() │
│ with token + payload│
└────────┬────────────┘
│
├─── 7. Validate payload (if typed)
│
├─── 8. Record hook_received event
│
└─── 9. Schedule workflow resumption
│
▼
┌───────────────┐
│ Replay events │
│ Resume work │
└───────────────┘
Choosing a Hook Type
PyWorkflow offers two ways to define hooks:
Feature TypedHook (Recommended) Simple Hook Type safety Full Pydantic validation Dict with any keys CLI prompts Interactive field-by-field JSON payload only IDE support Autocomplete, type hints None Schema stored Yes (enables CLI features) No Best for Production workflows Quick prototypes
We recommend TypedHook for all production workflows. The Pydantic schema enables CLI interactive prompts, payload validation, and better IDE support.
TypedHook with Pydantic (Recommended)
TypedHook combines hook suspension with Pydantic validation for type-safe payloads.
Defining a Typed Hook
from pydantic import BaseModel
from typing import Optional
from pyworkflow import define_hook
# 1. Define the payload schema
class ApprovalPayload ( BaseModel ):
approved: bool
reviewer: str
comments: Optional[ str ] = None
# 2. Create the typed hook
approval_hook = define_hook( "manager_approval" , ApprovalPayload)
Using in a Workflow
from pyworkflow import workflow
@workflow ()
async def order_approval ( order_id : str ):
order = await prepare_order(order_id)
async def on_hook_created ( token : str ):
print ( f "Approval needed! Token: { token } " )
# Send token to external system, email, Slack, etc.
# Wait for approval - returns validated ApprovalPayload
approval: ApprovalPayload = await approval_hook(
timeout = "7d" ,
on_created = on_hook_created,
)
# Type-safe access to payload fields
if approval.approved:
print ( f "Approved by { approval.reviewer } " )
return await fulfill_order(order)
else :
return await cancel_order(order, approval.comments or "Rejected" )
CLI Interactive Resume
When you run pyworkflow hooks resume, the CLI uses the stored Pydantic schema to prompt for each field:
$ pyworkflow hooks resume
? Select a pending hook to resume:
> manager_approval (run_abc123:hook_manager_approval_1)
? approved ( bool ): yes
? reviewer ( str ): admin@example.com
? comments ( str, optional ): Looks good !
Hook resumed successfully.
Simple Hook
For quick prototyping or when you don’t need typed payloads, use the hook() function directly.
from pyworkflow import hook, workflow
@workflow ()
async def simple_approval ( order_id : str ):
order = await prepare_order(order_id)
async def on_hook_created ( token : str ):
print ( f "Resume with: pyworkflow hooks resume { token } " )
# Wait for any payload (untyped dict)
approval = await hook(
"approval" ,
timeout = "24h" ,
on_created = on_hook_created,
)
# Payload is a dict - no type safety
if approval.get( "approved" ):
return await fulfill_order(order)
else :
return await cancel_order(order, approval.get( "reason" , "Rejected" ))
Simple hooks don’t enable CLI interactive prompts. You must provide the full JSON payload when resuming.
Resuming Hooks
Using the CLI
# List all pending hooks
pyworkflow hooks list --status pending
# View hook details
pyworkflow hooks info < toke n >
# Resume interactively (TypedHook only - prompts for fields)
pyworkflow hooks resume
# Resume with explicit payload
pyworkflow hooks resume < toke n > --payload '{"approved": true, "reviewer": "admin"}'
# Resume with payload from file
pyworkflow hooks resume < toke n > --payload-file approval.json
Programmatically
from pyworkflow import resume_hook
# Resume a hook with payload
result = await resume_hook(
token = "run_abc123:hook_manager_approval_1" ,
payload = { "approved" : True , "reviewer" : "admin@example.com" },
)
if result.success:
print ( f "Hook resumed, workflow continuing" )
else :
print ( f "Failed: { result.error } " )
Tokens are composite identifiers in the format run_id:hook_id:
run_abc123:hook_manager_approval_1
└────┬────┘ └─────────┬──────────┘
run_id hook_id
This self-describing format allows the system to route the resumption to the correct workflow run.
Configuration Options
Option Type Description timeoutstr | intMaximum wait time before hook expires. String format ("24h", "7d") or seconds. on_createdCallable[[str], Awaitable[None]]Async callback invoked with token when hook is created. payload_schemaType[BaseModel]Pydantic model for payload validation (used by simple hook() only).
Timeout Examples
# String format (recommended)
await approval_hook( timeout = "24h" ) # 24 hours
await approval_hook( timeout = "7d" ) # 7 days
await approval_hook( timeout = "1h30m" ) # 1 hour 30 minutes
# Integer (seconds)
await approval_hook( timeout = 3600 ) # 1 hour
# No timeout (waits indefinitely)
await approval_hook()
on_created Callback
The on_created callback is called with the hook token when the hook is created. Use it to notify external systems:
async def notify_approver ( token : str ):
await send_slack_message(
channel = "#approvals" ,
text = f "Approval needed! Resume: `pyworkflow hooks resume { token } `"
)
await send_email(
to = "manager@example.com" ,
subject = "Approval Required" ,
body = f "Click to approve: https://app.example.com/approve?token= { token } "
)
approval = await approval_hook(
timeout = "7d" ,
on_created = notify_approver,
)
Use Cases
Human Approval Workflows
@workflow ()
async def expense_approval ( expense_id : str , amount : float ):
expense = await prepare_expense(expense_id)
if amount > 1000 :
# High-value expenses need manager approval
approval = await manager_approval_hook( timeout = "48h" )
if not approval.approved:
return await reject_expense(expense, approval.reason)
return await process_expense(expense)
Multi-Level Approval
@workflow ()
async def contract_approval ( contract_id : str ):
contract = await prepare_contract(contract_id)
# Level 1: Manager approval
manager = await manager_hook( timeout = "24h" )
if not manager.approved:
return await reject_contract(contract, "Manager rejected" )
# Level 2: Legal review
legal = await legal_hook( timeout = "72h" )
if not legal.approved:
return await reject_contract(contract, "Legal rejected" )
# Level 3: Executive sign-off
executive = await executive_hook( timeout = "7d" )
if not executive.approved:
return await reject_contract(contract, "Executive rejected" )
return await execute_contract(contract)
Webhook Integration
@workflow ()
async def payment_processing ( order_id : str ):
order = await create_order(order_id)
async def setup_webhook ( token : str ):
# Register token with payment provider
await payment_provider.register_webhook(
callback_url = f "https://api.example.com/hooks/ { token } " ,
)
# Wait for payment confirmation from external provider
payment = await payment_confirmation_hook(
timeout = "1h" ,
on_created = setup_webhook,
)
if payment.status == "completed" :
return await fulfill_order(order)
else :
return await cancel_order(order, payment.error)
User Confirmation
@workflow ()
async def account_deletion ( user_id : str ):
user = await get_user(user_id)
async def send_confirmation_email ( token : str ):
await send_email(
to = user.email,
subject = "Confirm Account Deletion" ,
body = f "Click to confirm: https://app.example.com/confirm?token= { token } "
)
# Wait for user to confirm deletion
confirmation = await confirmation_hook(
timeout = "24h" ,
on_created = send_confirmation_email,
)
if confirmation.confirmed:
return await delete_user(user_id)
else :
return { "status" : "cancelled" , "user_id" : user_id}
Best Practices
Use TypedHook for production workflows
TypedHook provides validation, IDE support, and CLI interactive prompts: # Good: Typed hook with Pydantic
class ApprovalPayload ( BaseModel ):
approved: bool
reviewer: str
approval_hook = define_hook( "approval" , ApprovalPayload)
# Avoid in production: Untyped hook
approval = await hook( "approval" )
Always set timeouts to prevent workflows from waiting indefinitely: # Good: Set reasonable timeout
await approval_hook( timeout = "7d" )
# Avoid: No timeout (waits forever)
await approval_hook()
Handle expiration gracefully in your workflow logic.
Use descriptive hook names
Hook names should clearly indicate their purpose: # Good: Clear purpose
manager_approval_hook = define_hook( "manager_approval" , ApprovalPayload)
payment_confirmation_hook = define_hook( "payment_confirmation" , PaymentPayload)
# Avoid: Vague names
hook1 = define_hook( "hook1" , Payload)
Notify external systems via on_created
Use the on_created callback to trigger notifications: async def notify_systems ( token : str ):
await send_slack_notification(token)
await send_email_notification(token)
await register_webhook(token)
approval = await approval_hook(
timeout = "24h" ,
on_created = notify_systems,
)
Handle hook errors gracefully
Catch and handle hook-related exceptions: from pyworkflow import HookExpiredError, HookNotFoundError
try :
result = await resume_hook(token, payload)
except HookExpiredError:
print ( "Hook has expired" )
except HookNotFoundError:
print ( "Hook not found" )
Testing Hooks
Use MockContext to test workflows with hooks without actual suspension:
import asyncio
from pyworkflow import MockContext, set_context
def test_approval_workflow ():
# Create mock context with predefined hook responses
ctx = MockContext(
run_id = "test_run" ,
workflow_name = "approval_workflow" ,
mock_hooks = {
"manager_approval" : {
"approved" : True ,
"reviewer" : "test@example.com" ,
"comments" : "Approved in test" ,
}
}
)
set_context(ctx)
try :
# Run workflow - hook returns mock response immediately
result = asyncio.run(approval_workflow( "order-123" ))
# Verify workflow completed correctly
assert result[ "status" ] == "fulfilled"
assert ctx.hook_count == 1
finally :
set_context( None )
Testing Multiple Hooks
def test_multi_approval_workflow ():
ctx = MockContext(
run_id = "test_run" ,
workflow_name = "multi_approval" ,
mock_hooks = {
"manager_approval" : { "approved" : True , "approver" : "manager" },
"finance_approval" : { "approved" : True , "approver" : "finance" },
}
)
set_context(ctx)
try :
result = asyncio.run(multi_approval_workflow( "order-123" ))
assert result[ "status" ] == "fulfilled"
assert ctx.hook_count == 2
finally :
set_context( None )
Hook Events
Hooks generate events that are stored in the event log:
Event Type When Data hook.createdHook is awaited hook_id, token, name, expires_at, schema hook.receivedHook is resumed hook_id, payload hook.expiredTimeout reached hook_id hook.disposedHook cleaned up hook_id
View hook events for a run:
pyworkflow runs logs < run_i d > --type hook
Next Steps