Skip to main content

Installation

Install PyWorkflow using pip:
pip install pyworkflow

Prerequisites

PyWorkflow requires Redis and Celery workers for distributed execution.

Your First Workflow

Create a simple onboarding workflow that sends emails with delays:
from pyworkflow import workflow, step, start, sleep

@step()
async def send_welcome_email(user_id: str):
    """Send a welcome email to the new user."""
    print(f"Sending welcome email to user {user_id}")
    return f"Email sent to {user_id}"

@step()
async def send_tips_email(user_id: str):
    """Send helpful tips after the welcome period."""
    print(f"Sending tips email to user {user_id}")
    return f"Tips sent to {user_id}"

@workflow()
async def onboarding_workflow(user_id: str):
    # Send welcome email immediately
    await send_welcome_email(user_id)

    # Sleep for 1 day - workflow suspends, zero resources used
    await sleep("1d")

    # Automatically resumes after 1 day
    await send_tips_email(user_id)

    return "Onboarding complete"

# Start the workflow
run_id = start(onboarding_workflow, user_id="user_123")
print(f"Workflow started: {run_id}")

What Happens Under the Hood

1

Workflow Starts

Your workflow is dispatched to an available Celery worker.
2

Welcome Email Sent

The send_welcome_email step executes and the result is recorded.
3

Workflow Suspends

When sleep("1d") is called, the workflow suspends and the worker is freed. Zero resources are consumed during the sleep period.
4

Automatic Resumption

After 1 day, Celery Beat automatically schedules the workflow to resume.
5

Tips Email Sent

The workflow picks up where it left off, sending the tips email.
6

Workflow Completes

The final result is recorded and the workflow is marked as complete.

Key Concepts

Adding Error Handling

Make your workflows fault-tolerant with automatic retries:
from pyworkflow import step, RetryableError, FatalError

@step(max_retries=3, retry_delay="exponential")
async def call_payment_api(amount: float):
    """Process payment with automatic retry on failure."""
    try:
        result = await payment_gateway.charge(amount)
        return result
    except PaymentGatewayTimeoutError:
        # Retry with exponential backoff
        raise RetryableError("Gateway timeout", retry_after="10s")
    except InsufficientFundsError:
        # Don't retry - this is a permanent failure
        raise FatalError("Insufficient funds")
Use RetryableError for transient failures (network issues, timeouts) and FatalError for permanent failures (invalid input, business rule violations).

Running in Parallel

Execute multiple steps concurrently using asyncio.gather():
import asyncio
from pyworkflow import workflow, step

@step()
async def fetch_user(user_id: str):
    return {"id": user_id, "name": "Alice"}

@step()
async def fetch_orders(user_id: str):
    return [{"id": "ORD-1"}, {"id": "ORD-2"}]

@step()
async def fetch_recommendations(user_id: str):
    return ["Product A", "Product B"]

@workflow()
async def dashboard_data(user_id: str):
    # Fetch all data in parallel
    user, orders, recommendations = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_recommendations(user_id)
    )

    return {
        "user": user,
        "orders": orders,
        "recommendations": recommendations
    }

Next Steps