Skip to main content

What is Step Context?

Step Context provides a way to share workflow-level data with steps executing on remote Celery workers. Unlike the workflow context which is process-local, Step Context is serialized and passed to workers, making it accessible in distributed step execution.
from pyworkflow import workflow, step, StepContext, get_step_context, set_step_context

class OrderContext(StepContext):
    workspace_id: str = ""
    user_id: str = ""
    order_id: str = ""

@workflow(context_class=OrderContext)
async def process_order(order_id: str, user_id: str):
    # Initialize context in workflow
    ctx = OrderContext(order_id=order_id, user_id=user_id)
    await set_step_context(ctx)

    # Steps can read the context
    result = await validate_order()
    return result

@step()
async def validate_order():
    ctx = get_step_context()  # Read-only access
    print(f"Validating order {ctx.order_id} for user {ctx.user_id}")
    return {"valid": True}

Key Characteristics

Type-Safe

Define typed context fields using Pydantic models with full IDE support.

Read-Only in Steps

Context is read-only during step execution to prevent race conditions.

Distributed

Context is automatically serialized and passed to Celery workers.

Event-Sourced

Context changes are recorded as events for deterministic replay.

Why Read-Only in Steps?

When steps execute in parallel on different workers, allowing them to modify shared context would cause race conditions:
Worker A: read context → modify → save     ─┐
                                            ├─> Lost update!
Worker B: read context → modify → save     ─┘
By making context read-only in steps, PyWorkflow follows the same pattern used by Temporal and Prefect - activities/tasks are stateless, and state mutations happen through return values in the workflow.
If you need to update context based on step results, do it in the workflow code after the step returns.

Defining a Context Class

Create a context class by extending StepContext:
from pyworkflow import StepContext

class FlowContext(StepContext):
    # Required fields (no default)
    workspace_id: str

    # Optional fields (with defaults)
    user_id: str = ""
    request_id: str = ""
    tags: list[str] = []

Immutable by Design

Step Context is immutable (frozen). To update values, use with_updates() which creates a new instance:
ctx = FlowContext(workspace_id="ws-123")
print(ctx.workspace_id)  # "ws-123"

# Create new context with updated values
ctx = ctx.with_updates(user_id="user-456", request_id="req-789")
print(ctx.user_id)  # "user-456"

# Original values are preserved
print(ctx.workspace_id)  # "ws-123"
Direct attribute assignment raises an error:
ctx.user_id = "new-value"  # ValidationError!

Using Step Context

Setting Context (Workflow Only)

Use set_step_context() to set or update the context. This can only be called from workflow code:
from pyworkflow import workflow, set_step_context

@workflow(context_class=OrderContext)
async def my_workflow(user_id: str):
    # Initialize context
    ctx = OrderContext(user_id=user_id, workspace_id="ws-123")
    await set_step_context(ctx)  # Note: async function

    # ... execute steps ...

    # Update context (creates new instance)
    ctx = get_step_context()
    ctx = ctx.with_updates(order_id="order-456")
    await set_step_context(ctx)
set_step_context() is an async function because it persists the context to storage.

Reading Context (Workflow and Steps)

Use get_step_context() to access the current context from anywhere:
from pyworkflow import step, get_step_context

@step()
async def send_notification():
    ctx = get_step_context()

    await notify_user(
        user_id=ctx.user_id,
        workspace_id=ctx.workspace_id,
        message=f"Order {ctx.order_id} processed"
    )

Checking Context Availability

Use has_step_context() to check if context is available:
from pyworkflow import has_step_context, get_step_context

@step()
async def optional_logging():
    if has_step_context():
        ctx = get_step_context()
        logger.info(f"Processing in workspace {ctx.workspace_id}")

Read-Only Enforcement

Attempting to set context from within a step raises a RuntimeError:
@step()
async def my_step():
    ctx = get_step_context()

    # This works - reading context
    print(ctx.workspace_id)

    # This raises RuntimeError!
    new_ctx = ctx.with_updates(workspace_id="new-ws")
    await set_step_context(new_ctx)  # RuntimeError: Cannot modify step context within a step
To update context based on step results, return data from the step and update in the workflow:
@workflow(context_class=OrderContext)
async def process_order(order_id: str):
    ctx = OrderContext(order_id=order_id)
    await set_step_context(ctx)

    # Step returns data instead of modifying context
    validation_result = await validate_order()

    # Update context in workflow based on step result
    if validation_result["needs_review"]:
        ctx = get_step_context()
        ctx = ctx.with_updates(status="pending_review")
        await set_step_context(ctx)

Context Persistence and Replay

Step Context is persisted for durability:
  1. Persistence: When you call set_step_context(), the context is stored in the WorkflowRun.context field.
  2. Replay: When a workflow resumes after suspension:
    • Context is restored from WorkflowRun.context
    • Steps receive the same context they had during original execution
@workflow(context_class=OrderContext)
async def durable_workflow(order_id: str):
    ctx = OrderContext(order_id=order_id)
    await set_step_context(ctx)  # Persisted to storage

    await some_step()

    await sleep("1h")  # Workflow suspends

    # After 1 hour, workflow resumes
    # Context is automatically restored from events
    ctx = get_step_context()
    print(ctx.order_id)  # Still "order_id" - restored from replay

Complex Context Types

Step Context supports complex nested types:
from pydantic import BaseModel

class Address(BaseModel):
    street: str
    city: str
    country: str

    model_config = {"frozen": True}

class CustomerContext(StepContext):
    customer_id: str
    email: str = ""
    shipping_address: Address | None = None
    metadata: dict[str, str] = {}

@workflow(context_class=CustomerContext)
async def ship_order(customer_id: str, address: dict):
    ctx = CustomerContext(
        customer_id=customer_id,
        shipping_address=Address(**address)
    )
    await set_step_context(ctx)
All context fields must be JSON-serializable. Avoid storing non-serializable objects like database connections or file handles.

Best Practices

Store only essential cross-cutting data like IDs, user info, and configuration. Don’t use context as a data store - pass large data as step arguments instead.
# Good - small, essential data
class GoodContext(StepContext):
    workspace_id: str
    user_id: str
    request_id: str

# Bad - too much data
class BadContext(StepContext):
    workspace_id: str
    user_data: dict  # Could be large
    all_orders: list[dict]  # Definitely too large
Step Context is ideal for data needed by many steps: auth info, workspace IDs, correlation IDs, feature flags.
class RequestContext(StepContext):
    workspace_id: str
    user_id: str
    correlation_id: str  # For distributed tracing
    feature_flags: dict[str, bool] = {}
Set up context at the beginning of your workflow before calling any steps.
@workflow(context_class=MyContext)
async def my_workflow(workspace_id: str, user_id: str):
    # Initialize context first
    ctx = MyContext(workspace_id=workspace_id, user_id=user_id)
    await set_step_context(ctx)

    # Now call steps
    await step_one()
    await step_two()
Context is persisted to storage. Use secret managers or environment variables for sensitive data.
# Bad - secrets in context
class BadContext(StepContext):
    api_key: str  # Don't do this!

# Good - reference to secret, not the secret itself
class GoodContext(StepContext):
    secret_name: str  # Reference to secret in vault

@step()
async def call_api():
    ctx = get_step_context()
    api_key = await secret_manager.get(ctx.secret_name)

API Reference

FunctionDescription
StepContextBase class for user-defined context
get_step_context()Get current context (raises if not set)
set_step_context(ctx)Set context (async, workflow only)
has_step_context()Check if context is available

StepContext Methods

MethodDescription
with_updates(**kwargs)Create new context with updated fields
to_dict()Serialize context to dictionary
from_dict(data)Deserialize context from dictionary

Next Steps