A step is an isolated, retryable unit of work within a workflow. Steps are where the actual business logic executes - calling APIs, processing data, sending emails, etc. Each step runs on a Celery worker and can be retried independently if it fails.
Copy
from pyworkflow import step@step()async def send_email(to: str, subject: str, body: str): """Send an email - retries automatically on failure.""" async with EmailClient() as client: await client.send(to=to, subject=subject, body=body) return {"sent": True, "to": to}
Use for permanent failures that should stop the workflow:
Copy
from pyworkflow import step, FatalError@step()async def validate_input(data: dict): if "email" not in data: raise FatalError("Email is required") if not is_valid_email(data["email"]): raise FatalError(f"Invalid email: {data['email']}") return data
When a workflow resumes after suspension, completed steps are not re-executed. Instead, their cached results are returned:
Copy
@workflow()async def my_workflow(): # First run: executes the step # After resume: returns cached result user = await fetch_user("user_123") await sleep("1h") # After 1 hour, workflow resumes # fetch_user is NOT called again - cached result is used await send_email(user["email"], "Hello!")
Step results must be serializable. PyWorkflow supports common Python types (dict, list, str, int, datetime, etc.) and uses cloudpickle for complex objects.