Skip to main content

What is Event Sourcing?

PyWorkflow uses event sourcing to achieve durable, fault-tolerant execution. Instead of storing just the current state, every state change is recorded as an immutable event in an append-only log. This enables:
  • Durability: Workflows survive crashes and restarts
  • Replay: Workflows can resume from any point
  • Auditability: Complete history of everything that happened
Workflow Execution Timeline
────────────────────────────────────────────────────────►

Event 1          Event 2           Event 3           Event 4
workflow_started → step_completed → sleep_started → ...

       │              │                │
       ▼              ▼                ▼
┌──────────────────────────────────────────────────┐
│              Event Log (Append-Only)              │
└──────────────────────────────────────────────────┘

How It Works

Recording Events

As your workflow executes, PyWorkflow automatically records events:
@workflow()
async def order_workflow(order_id: str):
    # Event: workflow_started

    order = await validate_order(order_id)
    # Event: step_completed (validate_order)

    await sleep("1h")
    # Event: sleep_started
    # Workflow suspends here...

    # ... 1 hour later ...
    # Event: workflow_resumed

    await send_confirmation(order)
    # Event: step_completed (send_confirmation)

    return order
    # Event: workflow_completed

Replaying Events

When a workflow resumes after suspension, PyWorkflow replays all recorded events to restore the exact state:
# Replay process:
# 1. Load all events for this run_id
# 2. For each step_completed event, cache the result
# 3. Re-execute the workflow function
# 4. When a step is called, return cached result instead of executing
# 5. Continue from where we left off
During replay, steps are not re-executed. Their cached results from the event log are returned immediately. This ensures deterministic execution.

Event Types

PyWorkflow records 16 different event types:

Workflow Events

EventDescription
workflow_startedWorkflow execution began
workflow_completedWorkflow finished successfully
workflow_failedWorkflow terminated with an error
workflow_suspendedWorkflow paused (sleep, webhook)
workflow_resumedWorkflow continued after suspension

Step Events

EventDescription
step_startedStep execution began
step_completedStep finished successfully (result cached)
step_failedStep failed (may retry)
step_retryingStep is being retried

Sleep Events

EventDescription
sleep_startedSleep/delay began
sleep_completedSleep finished, workflow resuming

Log Events

EventDescription
log_infoInfo-level log message
log_warningWarning-level log message
log_errorError-level log message
log_debugDebug-level log message

Event Structure

Each event contains:
{
    "id": "evt_abc123",           # Unique event ID
    "run_id": "run_xyz789",       # Workflow run ID
    "type": "step_completed",     # Event type
    "timestamp": "2025-01-15T10:30:45Z",
    "sequence": 3,                # Order in the event log
    "data": {                     # Event-specific data
        "step_id": "validate_order",
        "step_name": "validate_order",
        "result": {"order_id": "ORD-123", "valid": True},
        "duration_ms": 150
    }
}

Inspecting Events

Via Storage Backend

from pyworkflow.storage.file import FileStorageBackend

storage = FileStorageBackend()

# Get all events for a workflow run
events = await storage.get_events("run_xyz789")

for event in events:
    print(f"{event.sequence}: {event.type}")
    print(f"  Data: {event.data}")
    print(f"  Time: {event.timestamp}")

Example Event Log

Sequence | Type              | Data
---------|-------------------|----------------------------------
1        | workflow_started  | {workflow: "order_processing"}
2        | step_started      | {step_id: "validate_order"}
3        | step_completed    | {step_id: "validate_order", result: {...}}
4        | step_started      | {step_id: "process_payment"}
5        | step_failed       | {step_id: "process_payment", error: "timeout"}
6        | step_retrying     | {step_id: "process_payment", attempt: 2}
7        | step_started      | {step_id: "process_payment"}
8        | step_completed    | {step_id: "process_payment", result: {...}}
9        | sleep_started     | {duration: "1h", wake_time: "..."}
10       | workflow_suspended| {reason: "sleep"}

Deterministic Replay

For replay to work correctly, workflows must be deterministic:
Don’t do this - Non-deterministic operations break replay:
@workflow()
async def bad_workflow():
    # BAD: Random values differ on replay
    order_id = f"ORD-{random.randint(1000, 9999)}"

    # BAD: Current time differs on replay
    if datetime.now().hour < 12:
        await morning_flow()
    else:
        await afternoon_flow()
Do this instead - Use steps for non-deterministic operations:
@step()
async def generate_order_id():
    # Results are cached, so same ID on replay
    return f"ORD-{random.randint(1000, 9999)}"

@step()
async def get_current_time():
    # Cached, so same time on replay
    return datetime.now()

@workflow()
async def good_workflow():
    order_id = await generate_order_id()
    current_time = await get_current_time()

    if current_time.hour < 12:
        await morning_flow()
    else:
        await afternoon_flow()

Storage Backends

Events are stored in a pluggable storage backend:
BackendStatusUse Case
FileAvailableDevelopment, single-machine
RedisPlannedProduction, distributed
PostgreSQLPlannedEnterprise, complex queries
SQLitePlannedEmbedded applications

Configuring Storage

from pyworkflow.storage.file import FileStorageBackend

# File storage (default)
storage = FileStorageBackend(
    base_path="/var/lib/pyworkflow/events"
)

# Configure PyWorkflow to use this storage
from pyworkflow import configure
configure(storage=storage)

Benefits of Event Sourcing

Complete Audit Trail

Every action is recorded. Know exactly what happened and when.

Time Travel Debugging

Replay workflows to debug issues. See the exact state at any point.

Failure Recovery

Resume from the last successful point after a crash or restart.

Event-Driven Architecture

Events can trigger other systems, enabling loose coupling.

Next Steps