Skip to main content

What is a Step?

A step is an isolated, retryable unit of work within a workflow. Steps are where the actual business logic executes - calling APIs, processing data, sending emails, etc. Each step runs on a Celery worker and can be retried independently if it fails.
from pyworkflow import step

@step()
async def send_email(to: str, subject: str, body: str):
    """Send an email - retries automatically on failure."""
    async with EmailClient() as client:
        await client.send(to=to, subject=subject, body=body)
    return {"sent": True, "to": to}

Key Characteristics

Isolated

Each step runs independently with its own retry policy and timeout.

Retryable

Failed steps automatically retry with configurable backoff strategies.

Cached

Completed step results are cached and replayed during workflow resumption.

Distributed

Steps execute on Celery workers, distributing load across your cluster.

Creating Steps

  • Decorator
  • Class
from pyworkflow import step

@step()
async def fetch_user(user_id: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"/api/users/{user_id}")
        return response.json()

Configuration Options

  • Decorator
  • Class
@step(
    name="fetch_user_data",      # Custom step name
    max_retries=5,               # Retry up to 5 times
    retry_delay="exponential",   # Exponential backoff
    timeout="30s"                # Step timeout
)
async def fetch_user(user_id: str):
    pass
OptionTypeDefaultDescription
namestrFunction/class nameUnique identifier for the step
max_retriesint3Maximum retry attempts
retry_delaystr"fixed"Retry strategy: "fixed", "exponential", or duration
timeoutstr"60s"Maximum step execution time

Retry Strategies

Fixed Delay

Retry with a constant delay between attempts:
@step(max_retries=3, retry_delay="30s")
async def call_api():
    # Retries at: 30s, 60s, 90s
    pass

Exponential Backoff

Retry with increasing delays (recommended for external APIs):
@step(max_retries=5, retry_delay="exponential")
async def call_external_api():
    # Retries at: 1s, 2s, 4s, 8s, 16s
    pass

Custom Delay in Error

Specify retry delay when raising an error:
from pyworkflow import step, RetryableError

@step(max_retries=3)
async def rate_limited_api():
    response = await call_api()

    if response.status_code == 429:
        retry_after = response.headers.get("Retry-After", "60")
        raise RetryableError(
            "Rate limited",
            retry_after=f"{retry_after}s"
        )

    return response.json()

Error Handling

RetryableError

Use for transient failures that should be retried:
from pyworkflow import step, RetryableError

@step(max_retries=3)
async def fetch_data():
    try:
        return await external_api.get_data()
    except ConnectionError:
        raise RetryableError("Connection failed")
    except TimeoutError:
        raise RetryableError("Request timed out", retry_after="10s")

FatalError

Use for permanent failures that should stop the workflow:
from pyworkflow import step, FatalError

@step()
async def validate_input(data: dict):
    if "email" not in data:
        raise FatalError("Email is required")

    if not is_valid_email(data["email"]):
        raise FatalError(f"Invalid email: {data['email']}")

    return data

Error Flow

Step Execution

      ├─── Success ───────────────────────> Return Result

      └─── Exception

              ├─── FatalError ────────────> Workflow Failed

              └─── RetryableError / Other

                      ├─── Retries Left ──> Wait & Retry

                      └─── No Retries ────> Workflow Failed

Step Results and Caching

When a workflow resumes after suspension, completed steps are not re-executed. Instead, their cached results are returned:
@workflow()
async def my_workflow():
    # First run: executes the step
    # After resume: returns cached result
    user = await fetch_user("user_123")

    await sleep("1h")

    # After 1 hour, workflow resumes
    # fetch_user is NOT called again - cached result is used
    await send_email(user["email"], "Hello!")
Step results must be serializable. PyWorkflow supports common Python types (dict, list, str, int, datetime, etc.) and uses cloudpickle for complex objects.

Parallel Step Execution

Execute multiple steps concurrently using asyncio.gather():
import asyncio
from pyworkflow import workflow, step

@step()
async def fetch_user(user_id: str):
    return await api.get_user(user_id)

@step()
async def fetch_orders(user_id: str):
    return await api.get_orders(user_id)

@step()
async def fetch_preferences(user_id: str):
    return await api.get_preferences(user_id)

@workflow()
async def load_dashboard(user_id: str):
    # All three steps run in parallel
    user, orders, preferences = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_preferences(user_id)
    )

    return {
        "user": user,
        "orders": orders,
        "preferences": preferences
    }

Best Practices

Each step should do one thing well. This makes retries more efficient - if a step fails, only that specific operation is retried.
Steps may be retried, so ensure they can be safely re-executed. Use idempotency keys when calling external APIs.
@step()
async def charge_payment(order_id: str, amount: float):
    # Use order_id as idempotency key
    return await stripe.charges.create(
        amount=amount,
        idempotency_key=f"charge-{order_id}"
    )
Set timeouts based on expected execution time. External API calls should have shorter timeouts than data processing steps.
Distinguish between retryable and fatal errors. Don’t retry errors that will never succeed.

Next Steps