PyWorkflow uses event sourcing to achieve durable, fault-tolerant execution. Instead of storing just the current state, every state change is recorded as an immutable event in an append-only log. This enables:
Durability: Workflows survive crashes and restarts
Replay: Workflows can resume from any point
Auditability: Complete history of everything that happened
When a workflow resumes after suspension, PyWorkflow replays all recorded events to restore the exact state:
Copy
# Replay process:# 1. Load all events for this run_id# 2. For each step_completed event, cache the result# 3. Re-execute the workflow function# 4. When a step is called, return cached result instead of executing# 5. Continue from where we left off
During replay, steps are not re-executed. Their cached results from the event log are returned immediately. This ensures deterministic execution.
from pyworkflow.storage.file import FileStorageBackendstorage = FileStorageBackend()# Get all events for a workflow runevents = await storage.get_events("run_xyz789")for event in events: print(f"{event.sequence}: {event.type}") print(f" Data: {event.data}") print(f" Time: {event.timestamp}")
For replay to work correctly, workflows must be deterministic:
Don’t do this - Non-deterministic operations break replay:
Copy
@workflow()async def bad_workflow(): # BAD: Random values differ on replay order_id = f"ORD-{random.randint(1000, 9999)}" # BAD: Current time differs on replay if datetime.now().hour < 12: await morning_flow() else: await afternoon_flow()
Do this instead - Use steps for non-deterministic operations:
Copy
@step()async def generate_order_id(): # Results are cached, so same ID on replay return f"ORD-{random.randint(1000, 9999)}"@step()async def get_current_time(): # Cached, so same time on replay return datetime.now()@workflow()async def good_workflow(): order_id = await generate_order_id() current_time = await get_current_time() if current_time.hour < 12: await morning_flow() else: await afternoon_flow()
from pyworkflow.storage.file import FileStorageBackend# File storage (default)storage = FileStorageBackend( base_path="/var/lib/pyworkflow/events")# Configure PyWorkflow to use this storagefrom pyworkflow import configureconfigure(storage=storage)