PyWorkflow supports graceful workflow cancellation. When you cancel a workflow, it will terminate at the next checkpoint rather than being forcefully killed, allowing for proper cleanup.
Graceful Termination
Workflows stop at safe checkpoints, not mid-operation.
Cleanup Support
Catch CancellationError to perform cleanup before terminating.
Important: Cancellation does NOT interrupt a step that is already executing.If a step takes a long time (e.g., a 10-minute API call), the workflow will only detect cancellation after that step completes. This is by design to avoid leaving operations in an inconsistent state.
For steps that run for a long time, you can add cooperative cancellation checks:
Copy
from pyworkflow import step, get_context@step()async def process_large_dataset(dataset_id: str): ctx = get_context() dataset = await load_dataset(dataset_id) results = [] for chunk in dataset.chunks(): # Check for cancellation periodically await ctx.check_cancellation() result = await process_chunk(chunk) results.append(result) return results
The check_cancellation() method is async because in durable mode it queries the storage backend’s cancellation flag, enabling detection of external cancellation requests (e.g., from cancel_workflow()). It checks the in-memory flag first as a fast path, then falls back to storage if needed.This allows the step to respond to cancellation requests between chunks rather than waiting until the entire dataset is processed.
Workflows can catch CancellationError to perform cleanup before terminating:
Copy
from pyworkflow import workflow, step, CancellationError, shield@workflow()async def order_workflow(order_id: str): try: await reserve_inventory(order_id) await charge_payment(order_id) await create_shipment(order_id) return {"status": "completed"} except CancellationError: # Cleanup on cancellation async with shield(): # This cleanup will complete even if cancelled await release_inventory(order_id) await refund_payment(order_id) raise # Re-raise to mark workflow as cancelled
Use shield() to protect critical code from cancellation:
Copy
from pyworkflow import shieldasync with shield(): # This code will complete even if cancellation is requested await commit_transaction() await send_confirmation_email()
While inside a shield() block:
await ctx.check_cancellation() will not raise CancellationError
The cancellation request is preserved
Cancellation will take effect after exiting the shield
Don’t use shield() for long-running operations as it defeats the purpose of graceful cancellation.
# List cancelled workflowspyworkflow runs list --status cancelled# View details of a cancelled workflowpyworkflow runs status run_abc123# View event log including cancellation eventspyworkflow runs logs run_abc123
Only use shield() for truly critical operations like database commits or compensation logic. Long-running shielded operations delay cancellation.
Copy
# Good: Short critical operationasync with shield(): await db.commit()# Bad: Long operation in shieldasync with shield(): await process_million_records() # Defeats cancellation
Add cooperative checks in long steps
For steps that process large amounts of data, add periodic cancellation checks:
Copy
@step()async def batch_process(items: list): ctx = get_context() for i, item in enumerate(items): if i % 100 == 0: # Check every 100 items await ctx.check_cancellation() await process_item(item)
Provide cancellation reasons
Include a reason when cancelling for better debugging and audit trails:
Copy
await cancel_workflow( run_id, reason="Customer cancelled order #12345")