Installation
Install PyWorkflow using pip:
Create a New Project
The fastest way to get started is with the quickstart command:
This interactive command will:
Create a workflows/ directory with sample workflows
Generate pyworkflow.config.yaml configuration
Optionally start Docker services (Redis + Dashboard)
Quickstart Output Example
============================================================
PyWorkflow Quickstart
============================================================
Select a project template:
❯ Basic - Order processing and notifications (2 workflows)
Storage backend:
❯ SQLite - Single file database (recommended)
Start Docker services (Redis + Dashboard)? [Y/n]
Creating project structure...
✓ Created: workflows/__init__.py
✓ Created: workflows/orders.py
✓ Created: workflows/notifications.py
✓ Created: pyworkflow.config.yaml
============================================================
Project Created!
============================================================
Next steps:
1. Start a worker:
$ pyworkflow worker run
2. Run a workflow:
$ pyworkflow workflows run process_order \
--input '{"order_id": "123", "amount": 49.99}'
3. View the dashboard:
Open http://localhost:5173 in your browser
Non-Interactive Mode
For CI/CD or scripting, use non-interactive mode:
# Create project with defaults (SQLite storage, start Docker)
pyworkflow quickstart --non-interactive
# Create project without Docker
pyworkflow quickstart --non-interactive --skip-docker
# Use file storage instead of SQLite
pyworkflow quickstart --non-interactive --storage file
Manual Setup
If you prefer to set up manually or need more control:
Generate Docker configuration and start services: # Generate docker-compose.yml and config
pyworkflow setup
# Start services
docker compose up -d
This starts Redis and the PyWorkflow Dashboard. Start each component manually: # 1. Start Redis
docker run -d -p 6379:6379 redis:7-alpine
# 2. Create config file
cat > pyworkflow.config.yaml << EOF
module: workflows
runtime: celery
storage:
type: sqlite
base_path: pyworkflow_data/pyworkflow.db
celery:
broker: redis://localhost:6379/0
result_backend: redis://localhost:6379/1
EOF
# 3. Start Celery worker
pyworkflow worker run
Your First Workflow
Create a simple onboarding workflow that sends emails with delays:
from pyworkflow import workflow, step, start, sleep
@step ()
async def send_welcome_email ( user_id : str ):
"""Send a welcome email to the new user."""
print ( f "Sending welcome email to user { user_id } " )
return f "Email sent to { user_id } "
@step ()
async def send_tips_email ( user_id : str ):
"""Send helpful tips after the welcome period."""
print ( f "Sending tips email to user { user_id } " )
return f "Tips sent to { user_id } "
@workflow ()
async def onboarding_workflow ( user_id : str ):
# Send welcome email immediately
await send_welcome_email(user_id)
# Sleep for 1 day - workflow suspends, zero resources used
await sleep( "1d" )
# Automatically resumes after 1 day
await send_tips_email(user_id)
return "Onboarding complete"
# Start the workflow
run_id = start(onboarding_workflow, user_id = "user_123" )
print ( f "Workflow started: { run_id } " )
What Happens Under the Hood
Workflow Starts
Your workflow is dispatched to an available Celery worker.
Welcome Email Sent
The send_welcome_email step executes and the result is recorded.
Workflow Suspends
When sleep("1d") is called, the workflow suspends and the worker is freed.
Zero resources are consumed during the sleep period.
Automatic Resumption
After 1 day, Celery Beat automatically schedules the workflow to resume.
Tips Email Sent
The workflow picks up where it left off, sending the tips email.
Workflow Completes
The final result is recorded and the workflow is marked as complete.
Key Concepts
Adding Error Handling
Make your workflows fault-tolerant with automatic retries:
from pyworkflow import step, RetryableError, FatalError
@step ( max_retries = 3 , retry_delay = "exponential" )
async def call_payment_api ( amount : float ):
"""Process payment with automatic retry on failure."""
try :
result = await payment_gateway.charge(amount)
return result
except PaymentGatewayTimeoutError:
# Retry with exponential backoff
raise RetryableError( "Gateway timeout" , retry_after = "10s" )
except InsufficientFundsError:
# Don't retry - this is a permanent failure
raise FatalError( "Insufficient funds" )
Use RetryableError for transient failures (network issues, timeouts) and FatalError for permanent failures (invalid input, business rule violations).
Running in Parallel
Execute multiple steps concurrently using asyncio.gather():
import asyncio
from pyworkflow import workflow, step
@step ()
async def fetch_user ( user_id : str ):
return { "id" : user_id, "name" : "Alice" }
@step ()
async def fetch_orders ( user_id : str ):
return [{ "id" : "ORD-1" }, { "id" : "ORD-2" }]
@step ()
async def fetch_recommendations ( user_id : str ):
return [ "Product A" , "Product B" ]
@workflow ()
async def dashboard_data ( user_id : str ):
# Fetch all data in parallel
user, orders, recommendations = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_recommendations(user_id)
)
return {
"user" : user,
"orders" : orders,
"recommendations" : recommendations
}
Next Steps