Skip to main content

What is a Workflow?

A workflow is the top-level orchestration function that coordinates multiple steps, handles business logic, and can pause for extended periods using sleep or webhooks. Workflows are the entry point for your business processes.
from pyworkflow import workflow, step, start, sleep

@workflow()
async def order_processing(order_id: str):
    # Validate and process the order
    order = await validate_order(order_id)
    payment = await process_payment(order)

    # Wait for fulfillment
    await sleep("2h")

    # Ship and notify
    await create_shipment(order)
    await send_confirmation(order)

    return {"status": "completed", "order_id": order_id}

Key Characteristics

Durable

Workflows survive crashes, restarts, and deployments. State is preserved through event sourcing.

Suspendable

Workflows can pause for minutes, hours, or days without consuming resources.

Distributed

Workflows execute across Celery workers, enabling horizontal scaling.

Deterministic

Workflows can be replayed from any point using the recorded event log.

Creating Workflows

  • Decorator
  • Class
from pyworkflow import workflow

@workflow()
async def my_workflow(user_id: str, amount: float):
    # Your workflow logic here
    result = await some_step(user_id, amount)
    return result

Configuration Options

  • Decorator
  • Class
@workflow(
    name="custom_workflow_name",  # Override the function name
    max_duration="24h",           # Maximum workflow runtime
    max_retries=3                 # Retry the entire workflow on failure
)
async def my_workflow():
    pass
OptionTypeDefaultDescription
namestrFunction/class nameUnique identifier for the workflow
max_durationstr"7d"Maximum time the workflow can run
max_retriesint0Number of times to retry the entire workflow

Starting Workflows

Synchronous Start

The start() function dispatches a workflow to Celery and returns immediately:
from pyworkflow import start

# Start the workflow (non-blocking)
run_id = start(order_processing, order_id="ORD-123")
print(f"Workflow started with ID: {run_id}")

With Idempotency Key

Prevent duplicate workflow executions:
run_id = start(
    order_processing,
    order_id="ORD-123",
    idempotency_key="order-ORD-123"
)

# Calling again with same key returns the same run_id
run_id_2 = start(
    order_processing,
    order_id="ORD-123",
    idempotency_key="order-ORD-123"
)

assert run_id == run_id_2  # True - same workflow

Workflow Lifecycle

┌─────────────┐
│   PENDING   │  Workflow created, waiting to start
└──────┬──────┘


┌─────────────┐
│   RUNNING   │  Workflow is executing
└──────┬──────┘

       ├────────────────┐
       │                │
       ▼                ▼
┌─────────────┐  ┌─────────────┐
│  SUSPENDED  │  │   FAILED    │  Error occurred
└──────┬──────┘  └─────────────┘

       │ (sleep ends / webhook received)

┌─────────────┐
│   RUNNING   │  Resumed execution
└──────┬──────┘


┌─────────────┐
│  COMPLETED  │  Workflow finished successfully
└─────────────┘

Workflow Context

Inside a workflow, you can access the execution context:
from pyworkflow import workflow
from pyworkflow.core.context import get_current_context

@workflow()
async def my_workflow():
    ctx = get_current_context()

    print(f"Run ID: {ctx.run_id}")
    print(f"Workflow: {ctx.workflow_name}")
    print(f"Started at: {ctx.started_at}")

    # Access step results from replay
    previous_result = ctx.step_results.get("step_id")

Error Handling

Workflows automatically handle errors based on their type:
from pyworkflow import workflow, FatalError, RetryableError

@workflow(max_retries=3)
async def my_workflow():
    try:
        result = await risky_operation()
        return result
    except ValidationError as e:
        # Fatal errors stop the workflow immediately
        raise FatalError(f"Invalid input: {e}")
    except TemporaryError as e:
        # Retryable errors trigger workflow retry
        raise RetryableError(f"Temporary failure: {e}")

Best Practices

Each workflow should handle a single business process. If a workflow is getting complex, consider breaking it into smaller workflows that call each other.
Workflow names should clearly describe their purpose: process_order, onboard_user, send_notification_sequence.
Use idempotency keys for workflows that shouldn’t run twice for the same input. This prevents duplicate processing during retries.
Use max_duration to prevent workflows from running indefinitely. Consider the longest possible execution path.

Next Steps