Skip to main content

What is Fault Tolerance?

In distributed systems, workers can fail unexpectedly due to crashes, OOM kills, network issues, or deployments. PyWorkflow’s fault tolerance ensures your workflows survive these failures and automatically resume from where they left off.

Automatic Detection

Worker crashes are detected automatically when Celery requeues tasks.

Event Replay

Completed steps are restored from the event log without re-execution.

Checkpoint Resume

Workflows continue from the last successful checkpoint, not from the beginning.

Configurable Limits

Control recovery attempts and behavior per workflow or globally.

How Auto Recovery Works

When a worker crashes mid-workflow, PyWorkflow automatically recovers:
┌─────────────────────────────────────────────────────────────────────┐
│ Worker A crashes while executing workflow                           │
│                                                                     │
│   1. Celery detects WorkerLostError                                │
│   2. Task is requeued to the broker                                │
│   3. Worker B picks up the task                                    │
│   4. Detects workflow is in RUNNING/INTERRUPTED status             │
│   5. Loads event log from storage                                  │
│   6. Replays events to restore state:                              │
│      - Step results are cached (skip re-execution)                 │
│      - Pending sleeps are marked complete                          │
│   7. Workflow continues from last checkpoint                       │
│                                                                     │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    │
│   │  Step 1  │───▶│  Step 2  │───▶│  Crash!  │    │  Step 3  │    │
│   │(complete)│    │(complete)│    │          │    │(pending) │    │
│   └──────────┘    └──────────┘    └────┬─────┘    └──────────┘    │
│                                        │                           │
│                                        │ Recovery                  │
│                                        ▼                           │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    │
│   │  Step 1  │───▶│  Step 2  │───▶│  Resume  │───▶│  Step 3  │    │
│   │ (replay) │    │ (replay) │    │          │    │(execute) │    │
│   └──────────┘    └──────────┘    └──────────┘    └──────────┘    │
└─────────────────────────────────────────────────────────────────────┘
The key insight is that event sourcing makes recovery possible. All completed steps are recorded as events, so on recovery, PyWorkflow simply replays those events to restore the workflow’s state without re-executing any work.

Configuration

Configure recovery per-workflow using the @workflow decorator:
from pyworkflow import workflow, step

@workflow(
    recover_on_worker_loss=True,    # Enable auto recovery
    max_recovery_attempts=5,         # Allow up to 5 recovery attempts
)
async def resilient_workflow(order_id: str):
    # Steps that complete before a crash won't re-execute
    order = await validate_order(order_id)
    payment = await process_payment(order)

    # If worker crashes here, workflow resumes from this point
    await create_shipment(order)
    return {"status": "completed"}

Configuration Options

OptionTypeDefaultDescription
recover_on_worker_lossboolTrue (durable) / False (transient)Enable automatic recovery on worker crash
max_recovery_attemptsint3Maximum number of recovery attempts before marking as failed

Configuration Priority

When resolving recovery settings, PyWorkflow uses this priority order:
PrioritySourceExample
1 (highest)@workflow() decorator@workflow(recover_on_worker_loss=True)
2pyworkflow.configure()configure(default_recover_on_worker_loss=True)
3Config filerecovery.recover_on_worker_loss: true
4 (lowest)Built-in defaultsTrue for durable, False for transient

Durable vs Transient Workflows

Recovery behavior differs based on workflow durability:
Durable workflows resume from the last checkpoint.
@workflow(durable=True, recover_on_worker_loss=True)
async def durable_pipeline(data_id: str):
    # These steps are recorded as events
    data = await fetch_data(data_id)      # Event: step_completed
    data = await validate_data(data)       # Event: step_completed

    await sleep("10m")                     # Event: sleep_started

    # If crash happens here, on recovery:
    # - fetch_data and validate_data results restored from events
    # - sleep marked as complete
    # - Execution continues from transform_data

    data = await transform_data(data)      # Executes after recovery
    return data
Recovery process:
  1. Load event log from storage
  2. Replay step_completed events (restore cached results)
  3. Complete pending sleep_started events
  4. Continue execution from the next step

Workflow States

Auto recovery introduces a new workflow state:
┌─────────────┐
│   PENDING   │  Workflow created, waiting to start
└──────┬──────┘


┌─────────────┐
│   RUNNING   │  Workflow is executing
└──────┬──────┘

       ├────────────────┬────────────────┐
       │                │                │
       ▼                ▼                ▼
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│  SUSPENDED  │  │ INTERRUPTED │  │   FAILED    │
└──────┬──────┘  └──────┬──────┘  └─────────────┘
       │                │
       │                │ (auto recovery)
       │                ▼
       │         ┌─────────────┐
       │         │   RUNNING   │  Recovered, resuming
       │         └──────┬──────┘
       │                │
       └────────────────┤


                 ┌─────────────┐
                 │  COMPLETED  │  Workflow finished
                 └─────────────┘
StatusDescription
INTERRUPTEDWorker crashed; workflow is awaiting recovery
RUNNINGWorkflow is executing (or has been recovered)

Monitoring Recovery

Use the CLI to monitor workflows that have been interrupted or recovered:
# List interrupted workflows
pyworkflow runs list --status interrupted

# List all workflows with recovery info
pyworkflow runs list --workflow my_workflow

# View detailed status including recovery attempts
pyworkflow runs status <run_id>

# View full event log including WORKFLOW_INTERRUPTED events
pyworkflow runs logs <run_id>
Example output:
$ pyworkflow runs status abc123
Run ID:       abc123
Workflow:     data_pipeline
Status:       running
Started:      2025-01-15 10:30:00
Recovery:     2/3 attempts (recover_on_worker_loss: true)
Last Event:   workflow.resumed (2025-01-15 10:35:00)

When to Disable Recovery

If your workflow makes calls that can’t be safely repeated (e.g., charging a credit card without idempotency keys), disable recovery or implement compensation logic.
@workflow(recover_on_worker_loss=False)
async def non_idempotent_workflow():
    # This payment call might succeed but crash before recording
    # Recovery would charge the customer again!
    await charge_credit_card(amount)  # Dangerous without idempotency
Some workflows should fail loudly and require human intervention rather than automatic recovery.
@workflow(
    recover_on_worker_loss=False,
    max_recovery_attempts=0,
)
async def critical_financial_workflow():
    # Any failure should be reviewed by humans
    await transfer_funds(amount)
If your workflow interacts with systems that don’t support rollback or compensation, partial re-execution could leave inconsistent state.
@workflow(recover_on_worker_loss=False)
async def legacy_integration():
    # Legacy system has no rollback capability
    await update_legacy_system(data)

Best Practices

Design steps to produce the same result when called multiple times with the same input. Use idempotency keys for external API calls.
@step()
async def create_order(order_id: str):
    # Use order_id as idempotency key
    return await api.create_order(
        order_id=order_id,
        idempotency_key=f"order-{order_id}"
    )
Don’t allow unlimited recovery attempts. Set max_recovery_attempts based on your tolerance for repeated failures.
@workflow(
    recover_on_worker_loss=True,
    max_recovery_attempts=3,  # Fail after 3 attempts
)
async def bounded_recovery_workflow():
    pass
Set up alerts for workflows that reach INTERRUPTED status frequently. This may indicate infrastructure issues.
# Example: Alert if more than 5 interrupted workflows in an hour
pyworkflow runs list --status interrupted --since 1h | wc -l
Critical business workflows should always use durable mode to ensure proper recovery.
@workflow(
    durable=True,  # Event sourcing enabled
    recover_on_worker_loss=True,
)
async def critical_business_workflow():
    pass

Next Steps