In distributed systems, workers can fail unexpectedly due to crashes, OOM kills, network issues, or deployments. PyWorkflow’s fault tolerance ensures your workflows survive these failures and automatically resume from where they left off.
Automatic Detection
Worker crashes are detected automatically when Celery requeues tasks.
Event Replay
Completed steps are restored from the event log without re-execution.
Checkpoint Resume
Workflows continue from the last successful checkpoint, not from the beginning.
Configurable Limits
Control recovery attempts and behavior per workflow or globally.
When a worker crashes mid-workflow, PyWorkflow automatically recovers:
┌─────────────────────────────────────────────────────────────────────┐│ Worker A crashes while executing workflow ││ ││ 1. Celery detects WorkerLostError ││ 2. Task is requeued to the broker ││ 3. Worker B picks up the task ││ 4. Detects workflow is in RUNNING/INTERRUPTED status ││ 5. Loads event log from storage ││ 6. Replays events to restore state: ││ - Step results are cached (skip re-execution) ││ - Pending sleeps are marked complete ││ 7. Workflow continues from last checkpoint ││ ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ Step 1 │───▶│ Step 2 │───▶│ Crash! │ │ Step 3 │ ││ │(complete)│ │(complete)│ │ │ │(pending) │ ││ └──────────┘ └──────────┘ └────┬─────┘ └──────────┘ ││ │ ││ │ Recovery ││ ▼ ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │ Step 1 │───▶│ Step 2 │───▶│ Resume │───▶│ Step 3 │ ││ │ (replay) │ │ (replay) │ │ │ │(execute) │ ││ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │└─────────────────────────────────────────────────────────────────────┘
The key insight is that event sourcing makes recovery possible. All completed steps are recorded as events, so on recovery, PyWorkflow simply replays those events to restore the workflow’s state without re-executing any work.
Configure recovery per-workflow using the @workflow decorator:
from pyworkflow import workflow, step@workflow( recover_on_worker_loss=True, # Enable auto recovery max_recovery_attempts=5, # Allow up to 5 recovery attempts)async def resilient_workflow(order_id: str): # Steps that complete before a crash won't re-execute order = await validate_order(order_id) payment = await process_payment(order) # If worker crashes here, workflow resumes from this point await create_shipment(order) return {"status": "completed"}
Recovery behavior differs based on workflow durability:
Durable Workflows
Transient Workflows
Durable workflows resume from the last checkpoint.
@workflow(durable=True, recover_on_worker_loss=True)async def durable_pipeline(data_id: str): # These steps are recorded as events data = await fetch_data(data_id) # Event: step_completed data = await validate_data(data) # Event: step_completed await sleep("10m") # Event: sleep_started # If crash happens here, on recovery: # - fetch_data and validate_data results restored from events # - sleep marked as complete # - Execution continues from transform_data data = await transform_data(data) # Executes after recovery return data
@workflow(durable=False, recover_on_worker_loss=True)async def transient_batch(batch_id: str): # No events are recorded items = await fetch_items(batch_id) items = await process_items(items) # If crash happens here, on recovery: # - No event log to replay # - Workflow starts over from fetch_items items = await finalize_items(items) return items
Transient workflows with recovery enabled will restart from scratch on each recovery attempt. Ensure your steps are idempotent if using this pattern.
Use the CLI to monitor workflows that have been interrupted or recovered:
# List interrupted workflowspyworkflow runs list --status interrupted# List all workflows with recovery infopyworkflow runs list --workflow my_workflow# View detailed status including recovery attemptspyworkflow runs status <run_id># View full event log including WORKFLOW_INTERRUPTED eventspyworkflow runs logs <run_id>
If your workflow makes calls that can’t be safely repeated (e.g., charging a credit card without idempotency keys), disable recovery or implement compensation logic.
@workflow(recover_on_worker_loss=False)async def non_idempotent_workflow(): # This payment call might succeed but crash before recording # Recovery would charge the customer again! await charge_credit_card(amount) # Dangerous without idempotency
Critical workflows requiring human review
Some workflows should fail loudly and require human intervention rather than automatic recovery.
@workflow( recover_on_worker_loss=False, max_recovery_attempts=0,)async def critical_financial_workflow(): # Any failure should be reviewed by humans await transfer_funds(amount)
External systems without rollback
If your workflow interacts with systems that don’t support rollback or compensation, partial re-execution could leave inconsistent state.
@workflow(recover_on_worker_loss=False)async def legacy_integration(): # Legacy system has no rollback capability await update_legacy_system(data)