PyWorkflow supports graceful workflow cancellation. When you cancel a workflow, it will terminate at the next checkpoint rather than being forcefully killed, allowing for proper cleanup.
Graceful Termination
Workflows stop at safe checkpoints, not mid-operation.
Cleanup Support
Catch CancellationError to perform cleanup before terminating.
Important: Cancellation does NOT interrupt a step that is already executing.If a step takes a long time (e.g., a 10-minute API call), the workflow will only detect cancellation after that step completes. This is by design to avoid leaving operations in an inconsistent state.
Workflows can catch CancellationError to perform cleanup before terminating:
Copy
from pyworkflow import workflow, step, CancellationError, shield@workflow()async def order_workflow(order_id: str): try: await reserve_inventory(order_id) await charge_payment(order_id) await create_shipment(order_id) return {"status": "completed"} except CancellationError: # Cleanup on cancellation async with shield(): # This cleanup will complete even if cancelled await release_inventory(order_id) await refund_payment(order_id) raise # Re-raise to mark workflow as cancelled
Use shield() to protect critical code from cancellation:
Copy
from pyworkflow import shieldasync with shield(): # This code will complete even if cancellation is requested await commit_transaction() await send_confirmation_email()
While inside a shield() block:
ctx.check_cancellation() will not raise CancellationError
The cancellation request is preserved
Cancellation will take effect after exiting the shield
Don’t use shield() for long-running operations as it defeats the purpose of graceful cancellation.
# List cancelled workflowspyworkflow runs list --status cancelled# View details of a cancelled workflowpyworkflow runs status run_abc123# View event log including cancellation eventspyworkflow runs logs run_abc123
Only use shield() for truly critical operations like database commits or compensation logic. Long-running shielded operations delay cancellation.
Copy
# Good: Short critical operationasync with shield(): await db.commit()# Bad: Long operation in shieldasync with shield(): await process_million_records() # Defeats cancellation
Add cooperative checks in long steps
For steps that process large amounts of data, add periodic cancellation checks:
Copy
@step()async def batch_process(items: list): ctx = get_context() for i, item in enumerate(items): if i % 100 == 0: # Check every 100 items ctx.check_cancellation() await process_item(item)
Provide cancellation reasons
Include a reason when cancelling for better debugging and audit trails:
Copy
await cancel_workflow( run_id, reason="Customer cancelled order #12345")