A step is an isolated, retryable unit of work within a workflow. Steps are where the actual business logic executes - calling APIs, processing data, sending emails, etc. Each step runs on a Celery worker and can be retried independently if it fails.
Copy
from pyworkflow import step@step()async def send_email(to: str, subject: str, body: str): """Send an email - retries automatically on failure.""" async with EmailClient() as client: await client.send(to=to, subject=subject, body=body) return {"sent": True, "to": to}
Use for permanent failures that should stop the workflow:
Copy
from pyworkflow import step, FatalError@step()async def validate_input(data: dict): if "email" not in data: raise FatalError("Email is required") if not is_valid_email(data["email"]): raise FatalError(f"Invalid email: {data['email']}") return data
When a workflow resumes after suspension, completed steps are not re-executed. Instead, their cached results are returned:
Copy
@workflow()async def my_workflow(): # First run: executes the step # After resume: returns cached result user = await fetch_user("user_123") await sleep("1h") # After 1 hour, workflow resumes # fetch_user is NOT called again - cached result is used await send_email(user["email"], "Hello!")
Step results must be serializable. PyWorkflow supports common Python types (dict, list, str, int, datetime, etc.) and uses cloudpickle for complex objects.
Steps running within a durable workflow can call certain workflow primitives. When a step executes on a Celery worker, the worker automatically sets up a workflow context with the parent workflow’s metadata, enabling primitives to function.
You can start child workflows from within a step using start_child_workflow(). Both wait_for_completion=True and wait_for_completion=False are supported:
Steps cannot suspend. When wait_for_completion=True is used from a step, the step worker blocks and polls storage for the child workflow’s completion (with exponential backoff from 1s up to 10s). The worker remains occupied for the entire duration of the child workflow. If the child takes 10 minutes, the step worker is busy for 10 minutes.At workflow level, wait_for_completion=True suspends the workflow and frees the worker immediately. From a step, this is not possible — the worker must wait. Keep this in mind for capacity planning, and prefer wait_for_completion=False with a ChildWorkflowHandle if the child may be long-running.
sleep() works within steps but uses asyncio.sleep instead of durable suspension:
Copy
from pyworkflow import step, sleep@step()async def rate_limited_api_call(url: str): response = await httpx.get(url) if response.status_code == 429: await sleep("30s") # Non-durable: uses asyncio.sleep on the worker response = await httpx.get(url) return response.json()
Unlike workflow-level sleep which frees the worker, sleep() within a step holds the worker for the duration. If the worker crashes during the sleep, the state is lost. For long delays, prefer workflow-level sleep.
hook() and define_hook() cannot be called from within steps. Hooks require workflow suspension to wait for external events, which is not possible during step execution. Move hook calls to workflow-level code instead.
By default, steps in a Celery runtime are dispatched to worker processes via the message broker. For lightweight steps where the broker round-trip adds unnecessary latency, use force_local=True to execute the step inline in the orchestrator process:
Copy
from pyworkflow import step@step(force_local=True)async def merge_results(user: dict, orders: list): """Runs inline - no Celery dispatch overhead.""" return { "user": user, "total_orders": len(orders), "latest_order": orders[0] if orders else None, }
Force-local steps retain all durability guarantees:
STEP_STARTED and STEP_COMPLETED events are recorded
Results are cached for replay during workflow resumption
Retry policies (max_retries, retry_delay) are respected
Cancellation checks still apply
The only difference is that execution happens in the orchestrator process rather than being dispatched to a remote Celery worker.
Use force_local for steps that are fast and lightweight (data merging, formatting, simple transformations). CPU-intensive or I/O-heavy steps should use the default distributed execution so they benefit from worker scaling.