Overview
PyWorkflow configuration determines how workflows execute: which runtime to use, where to store
state, and default behaviors. Configuration can come from multiple sources with a clear priority order.
Config File Zero-code configuration via pyworkflow.config.yaml
Programmatic Configure in Python code with pyworkflow.configure()
Per-Call Override Override settings per start() call
Environment Variables Configure via environment for deployment flexibility
Configuration Priority
When you call pyworkflow.start(), configuration is resolved in this order:
Priority Source Description 1 (highest) start() parametersExplicit runtime=, durable=, storage= arguments 2 pyworkflow.configure()Values set programmatically 3 pyworkflow.config.yamlConfig file in current directory 4 (lowest) Defaults runtime="local", durable=False
When you use Celery runtime in the config file (runtime: celery), PyWorkflow automatically
sets durable=True since Celery requires durable mode.
Config File (Recommended)
The simplest way to configure PyWorkflow is with a pyworkflow.config.yaml file in your
project directory:
# pyworkflow.config.yaml
# Module containing workflow definitions (for CLI discovery)
module : myapp.workflows
# Runtime: "celery" for distributed, "local" for in-process
runtime : celery
# Storage backend for durable workflows
storage :
backend : file # "file" or "memory"
path : ./workflow_data # Path for file backend
# Celery broker settings (when runtime: celery)
celery :
broker : redis://localhost:6379/0
result_backend : redis://localhost:6379/1
Automatic Loading
The config file is automatically loaded when:
CLI commands - pyworkflow worker run, pyworkflow workflows list, etc.
Python code - When you call pyworkflow.start() or pyworkflow.get_config()
# Your Python code - no explicit configuration needed!
import asyncio
import pyworkflow
from myapp.workflows import order_workflow
async def main ():
# Automatically uses settings from pyworkflow.config.yaml:
# - runtime: celery
# - durable: True (implied by celery runtime)
# - storage: FileStorageBackend("./workflow_data")
run_id = await pyworkflow.start(order_workflow, "order-123" , 99.99 )
print ( f "Started workflow: { run_id } " )
asyncio.run(main())
Config File Location
PyWorkflow looks for pyworkflow.config.yaml in the current working directory (where
you run your Python script or CLI command from).
myproject/
├── pyworkflow.config.yaml # Config file here
├── myapp/
│ └── workflows.py
└── scripts/
└── run_workflow.py
# Run from project root - config is found
cd myproject
python scripts/run_workflow.py # ✓ Uses pyworkflow.config.yaml
# Run from scripts directory - config NOT found (uses defaults)
cd myproject/scripts
python run_workflow.py # ✗ No config file in ./scripts/
Always run your scripts from the directory containing pyworkflow.config.yaml, or use
programmatic configuration if you need more control.
Project Structure
PyWorkflow supports two ways to organize your workflow code. The module field in your
config file tells PyWorkflow where to find and import your workflows.
Option 1: Single File
For simple projects, define all workflows in a single file:
myproject/
├── pyworkflow.config.yaml
└── workflows.py # All workflows here
# pyworkflow.config.yaml
module : workflows
# workflows.py
from pyworkflow import workflow, step
@step ()
async def validate_order ( order_id : str ) -> dict :
return { "order_id" : order_id, "valid" : True }
@workflow ()
async def process_order ( order_id : str ) -> dict :
result = await validate_order(order_id)
return { "status" : "completed" , ** result}
@workflow ()
async def send_notification ( user_id : str , message : str ) -> dict :
return { "sent" : True , "user_id" : user_id}
Option 2: Package Directory (Recommended)
For larger projects, organize workflows into a package with multiple files:
myproject/
├── pyworkflow.config.yaml
└── workflows/
├── __init__.py # Exports all workflows
├── orders.py # Order-related workflows
└── notifications.py # Notification workflows
# pyworkflow.config.yaml
module : workflows
# workflows/__init__.py
"""Export all workflows from the package."""
from .orders import process_order, refund_order
from .notifications import send_notification, send_bulk_notifications
__all__ = [
"process_order" ,
"refund_order" ,
"send_notification" ,
"send_bulk_notifications" ,
]
# workflows/orders.py
from pyworkflow import workflow, step
@step ()
async def validate_order ( order_id : str ) -> dict :
return { "order_id" : order_id, "valid" : True }
@step ()
async def process_payment ( order_id : str , amount : float ) -> dict :
return { "order_id" : order_id, "paid" : True }
@workflow ()
async def process_order ( order_id : str , amount : float ) -> dict :
validation = await validate_order(order_id)
payment = await process_payment(order_id, amount)
return { "status" : "completed" , "validation" : validation, "payment" : payment}
@workflow ()
async def refund_order ( order_id : str ) -> dict :
return { "order_id" : order_id, "refunded" : True }
# workflows/notifications.py
from pyworkflow import workflow, step, sleep
@step ()
async def send_email ( to : str , subject : str , body : str ) -> dict :
return { "sent" : True , "to" : to}
@workflow ()
async def send_notification ( user_id : str , message : str ) -> dict :
result = await send_email( f " { user_id } @example.com" , "Notification" , message)
return result
@workflow ()
async def send_bulk_notifications ( user_ids : list[ str ], message : str ) -> dict :
results = []
for user_id in user_ids:
result = await send_email( f " { user_id } @example.com" , "Notification" , message)
results.append(result)
await sleep( "1s" ) # Rate limiting
return { "sent" : len (results)}
How Discovery Works
When PyWorkflow imports your module:
Module Import : Python imports the specified module (e.g., workflows or workflows/__init__.py)
Decorator Registration : The @workflow and @step decorators automatically register
functions in the global registry
Explicit Exports : For package directories, __init__.py imports trigger the decorators
The key is that importing your module must trigger the @workflow decorators to run.
With a package directory, make sure __init__.py imports all workflow functions.
Nested Packages
For large applications, you can nest packages deeper:
myproject/
├── pyworkflow.config.yaml
└── myapp/
└── workflows/
├── __init__.py
├── orders/
│ ├── __init__.py
│ └── processing.py
└── notifications/
├── __init__.py
└── email.py
# pyworkflow.config.yaml
module : myapp.workflows
Each nested __init__.py should re-export workflows from its submodules to ensure
they are discovered when the top-level module is imported.
Programmatic Configuration
For more control, configure PyWorkflow in your Python code:
import pyworkflow
from pyworkflow.storage import FileStorageBackend
# Configure once at application startup
pyworkflow.configure(
default_runtime = "celery" , # or "local"
default_durable = True ,
storage = FileStorageBackend( "./workflow_data" ),
celery_broker = "redis://localhost:6379/0" ,
)
# All subsequent start() calls use these defaults
async def main ():
run_id = await pyworkflow.start(my_workflow, arg1, arg2)
Configuration Options
Option Type Default Description default_runtimestr"local"Default runtime: "local" or "celery" default_durableboolFalseWhether workflows are durable by default default_retriesint3Default retry count for steps storageStorageBackendNoneStorage backend instance celery_brokerstrNoneCelery broker URL aws_regionstrNoneAWS region (for Lambda runtimes) event_soft_limitint10,000Event count to start logging warnings event_hard_limitint50,000Event count to terminate workflow event_warning_intervalint100Events between warnings after soft limit
Event limit settings should not be modified unless you fully understand the implications. See Limitations for details.
Storage Backends
from pyworkflow.storage import FileStorageBackend, InMemoryStorageBackend
# File-based storage (persistent)
pyworkflow.configure(
storage = FileStorageBackend( "./workflow_data" )
)
# In-memory storage (for testing)
pyworkflow.configure(
storage = InMemoryStorageBackend()
)
Per-Call Overrides
Override configuration for individual start() calls:
import pyworkflow
# Override runtime for this specific call
run_id = await pyworkflow.start(
my_workflow,
arg1, arg2,
runtime = "local" , # Override: run locally instead of Celery
durable = False , # Override: transient execution
)
# Use custom storage for this call
from pyworkflow.storage import InMemoryStorageBackend
run_id = await pyworkflow.start(
my_workflow,
arg1, arg2,
storage = InMemoryStorageBackend(), # Override storage
)
Parameter Priority Example
import pyworkflow
# Global config: runtime="local", durable=False
pyworkflow.configure( default_runtime = "local" , default_durable = False )
# This call uses local runtime, transient (from config)
await pyworkflow.start(workflow_a, "arg" )
# This call overrides to celery runtime, durable
await pyworkflow.start(workflow_b, "arg" , runtime = "celery" , durable = True )
# This call uses local runtime (from config), but durable=True (override)
await pyworkflow.start(workflow_c, "arg" , durable = True )
Environment Variables
Environment variables provide deployment flexibility:
Variable Description PYWORKFLOW_MODULEModule for workflow discovery PYWORKFLOW_RUNTIMEDefault runtime (local or celery) PYWORKFLOW_STORAGE_BACKENDStorage backend type PYWORKFLOW_STORAGE_PATHPath for file storage PYWORKFLOW_CELERY_BROKERCelery broker URL PYWORKFLOW_CELERY_RESULT_BACKENDCelery result backend URL PYWORKFLOW_DISCOVERModules to import for workflow discovery
# Example: Production deployment with environment variables
export PYWORKFLOW_RUNTIME = celery
export PYWORKFLOW_CELERY_BROKER = redis :// redis-cluster : 6379 / 0
export PYWORKFLOW_STORAGE_PATH = / data / workflows
python -m myapp.main
Configuration Patterns
Development vs Production
# pyworkflow.config.yaml (dev)
module : myapp.workflows
runtime : local # Run in-process for fast iteration
storage :
backend : memory # No persistence needed
# pyworkflow.config.yaml (prod)
module : myapp.workflows
runtime : celery
storage :
backend : file
path : /data/workflows
celery :
broker : redis://redis:6379/0
result_backend : redis://redis:6379/1
Testing Configuration
import pytest
import pyworkflow
from pyworkflow.storage import InMemoryStorageBackend
@pytest.fixture ( autouse = True )
def reset_config ():
"""Reset PyWorkflow config before each test."""
pyworkflow.reset_config()
pyworkflow.configure(
default_runtime = "local" ,
default_durable = True ,
storage = InMemoryStorageBackend(),
)
yield
pyworkflow.reset_config()
Conditional Configuration
import os
import pyworkflow
if os.getenv( "ENVIRONMENT" ) == "production" :
pyworkflow.configure(
default_runtime = "celery" ,
default_durable = True ,
celery_broker = os.getenv( "CELERY_BROKER_URL" ),
)
else :
pyworkflow.configure(
default_runtime = "local" ,
default_durable = False ,
)
Fault Tolerance Settings
Configure auto recovery behavior for workflows that experience worker crashes.
Config File
Programmatic
Per-Workflow
# pyworkflow.config.yaml
recovery :
recover_on_worker_loss : true
max_recovery_attempts : 5
import pyworkflow
pyworkflow.configure(
default_recover_on_worker_loss = True ,
default_max_recovery_attempts = 5 ,
)
from pyworkflow import workflow
@workflow (
recover_on_worker_loss = True ,
max_recovery_attempts = 3 ,
)
async def resilient_workflow ():
pass
Recovery Options
Option Type Default Description recover_on_worker_lossboolTrue (durable)Enable automatic recovery when a worker crashes max_recovery_attemptsint3Maximum number of recovery attempts before marking as failed
For durable workflows, recovery replays events to restore state. For transient workflows, recovery restarts from the beginning. See Fault Tolerance for details.
Reading Current Configuration
Access the current configuration programmatically:
from pyworkflow.config import get_config
config = get_config()
print ( f "Runtime: { config.default_runtime } " )
print ( f "Durable: { config.default_durable } " )
print ( f "Storage: { config.storage } " )
print ( f "Celery Broker: { config.celery_broker } " )
Worker Configuration
When running Celery workers, you can configure worker behavior through CLI options.
See the CLI Guide for all available options including:
Autoscaling : Automatically scale worker processes based on load
Task limits : Control tasks per child and prefetch multiplier
Time limits : Set hard and soft time limits for tasks
Celery passthrough : Forward arbitrary arguments to Celery
# Example: Production worker with autoscaling and task limits
pyworkflow worker run --step --autoscale 2,10 --max-tasks-per-child 100
# Example: Pass arbitrary Celery options
pyworkflow worker run -- --max-memory-per-child=200000
Next Steps