Skip to main content

What is a Step?

A step is an isolated, retryable unit of work within a workflow. Steps are where the actual business logic executes - calling APIs, processing data, sending emails, etc. Each step runs on a Celery worker and can be retried independently if it fails.
from pyworkflow import step

@step()
async def send_email(to: str, subject: str, body: str):
    """Send an email - retries automatically on failure."""
    async with EmailClient() as client:
        await client.send(to=to, subject=subject, body=body)
    return {"sent": True, "to": to}

Key Characteristics

Isolated

Each step runs independently with its own retry policy and timeout.

Retryable

Failed steps automatically retry with configurable backoff strategies.

Cached

Completed step results are cached and replayed during workflow resumption.

Distributed

Steps execute on Celery workers, distributing load across your cluster.

Creating Steps

from pyworkflow import step

@step()
async def fetch_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"/api/users/{user_id}")
        return response.json()

Configuration Options

@step(
    name="fetch_user_data",      # Custom step name
    max_retries=5,               # Retry up to 5 times
    retry_delay="exponential",   # Exponential backoff
    timeout="30s"                # Step timeout
)
async def fetch_user(user_id: str):
    pass
OptionTypeDefaultDescription
namestrFunction/class nameUnique identifier for the step
max_retriesint3Maximum retry attempts
retry_delaystr"fixed"Retry strategy: "fixed", "exponential", or duration
timeoutstr"60s"Maximum step execution time
force_localboolFalseExecute inline in orchestrator instead of dispatching to Celery

Retry Strategies

Fixed Delay

Retry with a constant delay between attempts:
@step(max_retries=3, retry_delay="30s")
async def call_api():
    # Retries at: 30s, 60s, 90s
    pass

Exponential Backoff

Retry with increasing delays (recommended for external APIs):
@step(max_retries=5, retry_delay="exponential")
async def call_external_api():
    # Retries at: 1s, 2s, 4s, 8s, 16s
    pass

Custom Delay in Error

Specify retry delay when raising an error:
from pyworkflow import step, RetryableError

@step(max_retries=3)
async def rate_limited_api():
    response = await call_api()

    if response.status_code == 429:
        retry_after = response.headers.get("Retry-After", "60")
        raise RetryableError(
            "Rate limited",
            retry_after=f"{retry_after}s"
        )

    return response.json()

Error Handling

RetryableError

Use for transient failures that should be retried:
from pyworkflow import step, RetryableError

@step(max_retries=3)
async def fetch_data():
    try:
        return await external_api.get_data()
    except ConnectionError:
        raise RetryableError("Connection failed")
    except TimeoutError:
        raise RetryableError("Request timed out", retry_after="10s")

FatalError

Use for permanent failures that should stop the workflow:
from pyworkflow import step, FatalError

@step()
async def validate_input(data: dict):
    if "email" not in data:
        raise FatalError("Email is required")

    if not is_valid_email(data["email"]):
        raise FatalError(f"Invalid email: {data['email']}")

    return data

Error Flow

Step Execution

      ├─── Success ───────────────────────> Return Result

      └─── Exception

              ├─── FatalError ────────────> Workflow Failed

              └─── RetryableError / Other

                      ├─── Retries Left ──> Wait & Retry

                      └─── No Retries ────> Workflow Failed

Step Results and Caching

When a workflow resumes after suspension, completed steps are not re-executed. Instead, their cached results are returned:
@workflow()
async def my_workflow():
    # First run: executes the step
    # After resume: returns cached result
    user = await fetch_user("user_123")

    await sleep("1h")

    # After 1 hour, workflow resumes
    # fetch_user is NOT called again - cached result is used
    await send_email(user["email"], "Hello!")
Step results must be serializable. PyWorkflow supports common Python types (dict, list, str, int, datetime, etc.) and uses cloudpickle for complex objects.

Parallel Step Execution

Execute multiple steps concurrently using asyncio.gather():
import asyncio
from pyworkflow import workflow, step

@step()
async def fetch_user(user_id: str):
    return await api.get_user(user_id)

@step()
async def fetch_orders(user_id: str):
    return await api.get_orders(user_id)

@step()
async def fetch_preferences(user_id: str):
    return await api.get_preferences(user_id)

@workflow()
async def load_dashboard(user_id: str):
    # All three steps run in parallel
    user, orders, preferences = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_preferences(user_id)
    )

    return {
        "user": user,
        "orders": orders,
        "preferences": preferences
    }

Calling Primitives from Steps

Steps running within a durable workflow can call certain workflow primitives. When a step executes on a Celery worker, the worker automatically sets up a workflow context with the parent workflow’s metadata, enabling primitives to function.

Supported Primitives

PrimitiveSupportedBehavior
start_child_workflow()YesBoth wait_for_completion=True (polling) and False (fire-and-forget)
sleep()YesFalls back to asyncio.sleep (non-durable)
hook() / define_hook()NoRequires workflow suspension

Starting Child Workflows from Steps

You can start child workflows from within a step using start_child_workflow(). Both wait_for_completion=True and wait_for_completion=False are supported:
from pyworkflow import step, start_child_workflow, workflow

@workflow(durable=True)
async def payment_workflow(order_id: str, amount: float):
    await process_payment(order_id, amount)
    return {"paid": True, "amount": amount}

@step()
async def process_order(order_id: str):
    # Wait for child workflow result (uses polling on step workers)
    result = await start_child_workflow(
        payment_workflow,
        order_id,
        99.99,
        wait_for_completion=True,
    )
    return {"order_id": order_id, "payment": result}

@step()
async def process_order_async(order_id: str):
    # Fire-and-forget: returns ChildWorkflowHandle immediately
    handle = await start_child_workflow(
        payment_workflow,
        order_id,
        99.99,
        wait_for_completion=False,
    )
    return {"order_id": order_id, "payment_run_id": handle.child_run_id}
Steps cannot suspend. When wait_for_completion=True is used from a step, the step worker blocks and polls storage for the child workflow’s completion (with exponential backoff from 1s up to 10s). The worker remains occupied for the entire duration of the child workflow. If the child takes 10 minutes, the step worker is busy for 10 minutes.At workflow level, wait_for_completion=True suspends the workflow and frees the worker immediately. From a step, this is not possible — the worker must wait. Keep this in mind for capacity planning, and prefer wait_for_completion=False with a ChildWorkflowHandle if the child may be long-running.

Sleeping within Steps

sleep() works within steps but uses asyncio.sleep instead of durable suspension:
from pyworkflow import step, sleep

@step()
async def rate_limited_api_call(url: str):
    response = await httpx.get(url)
    if response.status_code == 429:
        await sleep("30s")  # Non-durable: uses asyncio.sleep on the worker
        response = await httpx.get(url)
    return response.json()
Unlike workflow-level sleep which frees the worker, sleep() within a step holds the worker for the duration. If the worker crashes during the sleep, the state is lost. For long delays, prefer workflow-level sleep.

Hooks are Not Supported from Steps

hook() and define_hook() cannot be called from within steps. Hooks require workflow suspension to wait for external events, which is not possible during step execution. Move hook calls to workflow-level code instead.

Force Local Execution

By default, steps in a Celery runtime are dispatched to worker processes via the message broker. For lightweight steps where the broker round-trip adds unnecessary latency, use force_local=True to execute the step inline in the orchestrator process:
from pyworkflow import step

@step(force_local=True)
async def merge_results(user: dict, orders: list):
    """Runs inline - no Celery dispatch overhead."""
    return {
        "user": user,
        "total_orders": len(orders),
        "latest_order": orders[0] if orders else None,
    }
Force-local steps retain all durability guarantees:
  • STEP_STARTED and STEP_COMPLETED events are recorded
  • Results are cached for replay during workflow resumption
  • Retry policies (max_retries, retry_delay) are respected
  • Cancellation checks still apply
The only difference is that execution happens in the orchestrator process rather than being dispatched to a remote Celery worker.
Use force_local for steps that are fast and lightweight (data merging, formatting, simple transformations). CPU-intensive or I/O-heavy steps should use the default distributed execution so they benefit from worker scaling.

Best Practices

Each step should do one thing well. This makes retries more efficient - if a step fails, only that specific operation is retried.
Steps may be retried, so ensure they can be safely re-executed. Use idempotency keys when calling external APIs.
@step()
async def charge_payment(order_id: str, amount: float):
    # Use order_id as idempotency key
    return await stripe.charges.create(
        amount=amount,
        idempotency_key=f"charge-{order_id}"
    )
Set timeouts based on expected execution time. External API calls should have shorter timeouts than data processing steps.
Distinguish between retryable and fatal errors. Don’t retry errors that will never succeed.

Next Steps