Skip to main content

Overview

PyWorkflow configuration determines how workflows execute: which runtime to use, where to store state, and default behaviors. Configuration can come from multiple sources with a clear priority order.

Config File

Zero-code configuration via pyworkflow.config.yaml

Programmatic

Configure in Python code with pyworkflow.configure()

Per-Call Override

Override settings per start() call

Environment Variables

Configure via environment for deployment flexibility

Configuration Priority

When you call pyworkflow.start(), configuration is resolved in this order:
PrioritySourceDescription
1 (highest)start() parametersExplicit runtime=, durable=, storage= arguments
2pyworkflow.configure()Values set programmatically
3pyworkflow.config.yamlConfig file in current directory
4 (lowest)Defaultsruntime="local", durable=False
When you use Celery runtime in the config file (runtime: celery), PyWorkflow automatically sets durable=True since Celery requires durable mode.

The simplest way to configure PyWorkflow is with a pyworkflow.config.yaml file in your project directory:
# pyworkflow.config.yaml

# Module containing workflow definitions (for CLI discovery)
module: myapp.workflows

# Runtime: "celery" for distributed, "local" for in-process
runtime: celery

# Storage backend for durable workflows
storage:
  backend: file          # "file" or "memory"
  path: ./workflow_data  # Path for file backend

# Celery broker settings (when runtime: celery)
celery:
  broker: redis://localhost:6379/0
  result_backend: redis://localhost:6379/1

Automatic Loading

The config file is automatically loaded when:
  1. CLI commands - pyworkflow worker run, pyworkflow workflows list, etc.
  2. Python code - When you call pyworkflow.start() or pyworkflow.get_config()
# Your Python code - no explicit configuration needed!
import asyncio
import pyworkflow
from myapp.workflows import order_workflow

async def main():
    # Automatically uses settings from pyworkflow.config.yaml:
    # - runtime: celery
    # - durable: True (implied by celery runtime)
    # - storage: FileStorageBackend("./workflow_data")
    run_id = await pyworkflow.start(order_workflow, "order-123", 99.99)
    print(f"Started workflow: {run_id}")

asyncio.run(main())

Config File Location

PyWorkflow looks for pyworkflow.config.yaml in the current working directory (where you run your Python script or CLI command from).
myproject/
├── pyworkflow.config.yaml  # Config file here
├── myapp/
   └── workflows.py
└── scripts/
    └── run_workflow.py

# Run from project root - config is found
cd myproject
python scripts/run_workflow.py  # ✓ Uses pyworkflow.config.yaml

# Run from scripts directory - config NOT found (uses defaults)
cd myproject/scripts
python run_workflow.py  # ✗ No config file in ./scripts/
Always run your scripts from the directory containing pyworkflow.config.yaml, or use programmatic configuration if you need more control.

Project Structure

PyWorkflow supports two ways to organize your workflow code. The module field in your config file tells PyWorkflow where to find and import your workflows.

Option 1: Single File

For simple projects, define all workflows in a single file:
myproject/
├── pyworkflow.config.yaml
└── workflows.py           # All workflows here
# pyworkflow.config.yaml
module: workflows
# workflows.py
from pyworkflow import workflow, step

@step()
async def validate_order(order_id: str) -> dict:
    return {"order_id": order_id, "valid": True}

@workflow()
async def process_order(order_id: str) -> dict:
    result = await validate_order(order_id)
    return {"status": "completed", **result}

@workflow()
async def send_notification(user_id: str, message: str) -> dict:
    return {"sent": True, "user_id": user_id}
For larger projects, organize workflows into a package with multiple files:
myproject/
├── pyworkflow.config.yaml
└── workflows/
    ├── __init__.py        # Exports all workflows
    ├── orders.py          # Order-related workflows
    └── notifications.py   # Notification workflows
# pyworkflow.config.yaml
module: workflows
# workflows/__init__.py
"""Export all workflows from the package."""
from .orders import process_order, refund_order
from .notifications import send_notification, send_bulk_notifications

__all__ = [
    "process_order",
    "refund_order",
    "send_notification",
    "send_bulk_notifications",
]
# workflows/orders.py
from pyworkflow import workflow, step

@step()
async def validate_order(order_id: str) -> dict:
    return {"order_id": order_id, "valid": True}

@step()
async def process_payment(order_id: str, amount: float) -> dict:
    return {"order_id": order_id, "paid": True}

@workflow()
async def process_order(order_id: str, amount: float) -> dict:
    validation = await validate_order(order_id)
    payment = await process_payment(order_id, amount)
    return {"status": "completed", "validation": validation, "payment": payment}

@workflow()
async def refund_order(order_id: str) -> dict:
    return {"order_id": order_id, "refunded": True}
# workflows/notifications.py
from pyworkflow import workflow, step, sleep

@step()
async def send_email(to: str, subject: str, body: str) -> dict:
    return {"sent": True, "to": to}

@workflow()
async def send_notification(user_id: str, message: str) -> dict:
    result = await send_email(f"{user_id}@example.com", "Notification", message)
    return result

@workflow()
async def send_bulk_notifications(user_ids: list[str], message: str) -> dict:
    results = []
    for user_id in user_ids:
        result = await send_email(f"{user_id}@example.com", "Notification", message)
        results.append(result)
        await sleep("1s")  # Rate limiting
    return {"sent": len(results)}

How Discovery Works

When PyWorkflow imports your module:
  1. Module Import: Python imports the specified module (e.g., workflows or workflows/__init__.py)
  2. Decorator Registration: The @workflow and @step decorators automatically register functions in the global registry
  3. Explicit Exports: For package directories, __init__.py imports trigger the decorators
The key is that importing your module must trigger the @workflow decorators to run. With a package directory, make sure __init__.py imports all workflow functions.

Nested Packages

For large applications, you can nest packages deeper:
myproject/
├── pyworkflow.config.yaml
└── myapp/
    └── workflows/
        ├── __init__.py
        ├── orders/
        │   ├── __init__.py
        │   └── processing.py
        └── notifications/
            ├── __init__.py
            └── email.py
# pyworkflow.config.yaml
module: myapp.workflows
Each nested __init__.py should re-export workflows from its submodules to ensure they are discovered when the top-level module is imported.

Programmatic Configuration

For more control, configure PyWorkflow in your Python code:
import pyworkflow
from pyworkflow.storage import FileStorageBackend

# Configure once at application startup
pyworkflow.configure(
    default_runtime="celery",      # or "local"
    default_durable=True,
    storage=FileStorageBackend("./workflow_data"),
    celery_broker="redis://localhost:6379/0",
)

# All subsequent start() calls use these defaults
async def main():
    run_id = await pyworkflow.start(my_workflow, arg1, arg2)

Configuration Options

OptionTypeDefaultDescription
default_runtimestr"local"Default runtime: "local" or "celery"
default_durableboolFalseWhether workflows are durable by default
default_retriesint3Default retry count for steps
storageStorageBackendNoneStorage backend instance
celery_brokerstrNoneCelery broker URL
aws_regionstrNoneAWS region (for Lambda runtimes)
event_soft_limitint10,000Event count to start logging warnings
event_hard_limitint50,000Event count to terminate workflow
event_warning_intervalint100Events between warnings after soft limit
Event limit settings should not be modified unless you fully understand the implications. See Limitations for details.

Storage Backends

from pyworkflow.storage import FileStorageBackend, InMemoryStorageBackend

# File-based storage (persistent)
pyworkflow.configure(
    storage=FileStorageBackend("./workflow_data")
)

# In-memory storage (for testing)
pyworkflow.configure(
    storage=InMemoryStorageBackend()
)

Per-Call Overrides

Override configuration for individual start() calls:
import pyworkflow

# Override runtime for this specific call
run_id = await pyworkflow.start(
    my_workflow,
    arg1, arg2,
    runtime="local",     # Override: run locally instead of Celery
    durable=False,       # Override: transient execution
)

# Use custom storage for this call
from pyworkflow.storage import InMemoryStorageBackend

run_id = await pyworkflow.start(
    my_workflow,
    arg1, arg2,
    storage=InMemoryStorageBackend(),  # Override storage
)

Parameter Priority Example

import pyworkflow

# Global config: runtime="local", durable=False
pyworkflow.configure(default_runtime="local", default_durable=False)

# This call uses local runtime, transient (from config)
await pyworkflow.start(workflow_a, "arg")

# This call overrides to celery runtime, durable
await pyworkflow.start(workflow_b, "arg", runtime="celery", durable=True)

# This call uses local runtime (from config), but durable=True (override)
await pyworkflow.start(workflow_c, "arg", durable=True)

Environment Variables

Environment variables provide deployment flexibility:
VariableDescription
PYWORKFLOW_MODULEModule for workflow discovery
PYWORKFLOW_RUNTIMEDefault runtime (local or celery)
PYWORKFLOW_STORAGE_BACKENDStorage backend type
PYWORKFLOW_STORAGE_PATHPath for file storage
PYWORKFLOW_CELERY_BROKERCelery broker URL
PYWORKFLOW_CELERY_RESULT_BACKENDCelery result backend URL
PYWORKFLOW_DISCOVERModules to import for workflow discovery
# Example: Production deployment with environment variables
export PYWORKFLOW_RUNTIME=celery
export PYWORKFLOW_CELERY_BROKER=redis://redis-cluster:6379/0
export PYWORKFLOW_STORAGE_PATH=/data/workflows

python -m myapp.main

Configuration Patterns

Development vs Production

# pyworkflow.config.yaml (dev)
module: myapp.workflows
runtime: local  # Run in-process for fast iteration
storage:
  backend: memory  # No persistence needed

Testing Configuration

import pytest
import pyworkflow
from pyworkflow.storage import InMemoryStorageBackend

@pytest.fixture(autouse=True)
def reset_config():
    """Reset PyWorkflow config before each test."""
    pyworkflow.reset_config()
    pyworkflow.configure(
        default_runtime="local",
        default_durable=True,
        storage=InMemoryStorageBackend(),
    )
    yield
    pyworkflow.reset_config()

Conditional Configuration

import os
import pyworkflow

if os.getenv("ENVIRONMENT") == "production":
    pyworkflow.configure(
        default_runtime="celery",
        default_durable=True,
        celery_broker=os.getenv("CELERY_BROKER_URL"),
    )
else:
    pyworkflow.configure(
        default_runtime="local",
        default_durable=False,
    )

Fault Tolerance Settings

Configure auto recovery behavior for workflows that experience worker crashes.
# pyworkflow.config.yaml

recovery:
  recover_on_worker_loss: true
  max_recovery_attempts: 5

Recovery Options

OptionTypeDefaultDescription
recover_on_worker_lossboolTrue (durable)Enable automatic recovery when a worker crashes
max_recovery_attemptsint3Maximum number of recovery attempts before marking as failed
For durable workflows, recovery replays events to restore state. For transient workflows, recovery restarts from the beginning. See Fault Tolerance for details.

Reading Current Configuration

Access the current configuration programmatically:
from pyworkflow.config import get_config

config = get_config()

print(f"Runtime: {config.default_runtime}")
print(f"Durable: {config.default_durable}")
print(f"Storage: {config.storage}")
print(f"Celery Broker: {config.celery_broker}")

Worker Configuration

When running Celery workers, you can configure worker behavior through CLI options. See the CLI Guide for all available options including:
  • Autoscaling: Automatically scale worker processes based on load
  • Task limits: Control tasks per child and prefetch multiplier
  • Time limits: Set hard and soft time limits for tasks
  • Celery passthrough: Forward arbitrary arguments to Celery
# Example: Production worker with autoscaling and task limits
pyworkflow worker run --step --autoscale 2,10 --max-tasks-per-child 100

# Example: Pass arbitrary Celery options
pyworkflow worker run -- --max-memory-per-child=200000

Next Steps