Skip to main content

Overview

PyWorkflow supports graceful workflow cancellation. When you cancel a workflow, it will terminate at the next checkpoint rather than being forcefully killed, allowing for proper cleanup.

Graceful Termination

Workflows stop at safe checkpoints, not mid-operation.

Cleanup Support

Catch CancellationError to perform cleanup before terminating.

Shield Critical Code

Use shield() to protect code that must complete.

CLI & API

Cancel via CLI command or programmatic API.

Cancelling a Workflow

Use cancel_workflow() to request cancellation:
from pyworkflow import cancel_workflow

# Request cancellation
cancelled = await cancel_workflow("run_abc123")

# With a reason
cancelled = await cancel_workflow(
    "run_abc123",
    reason="User requested cancellation"
)

# Wait for cancellation to complete
cancelled = await cancel_workflow(
    "run_abc123",
    wait=True,
    timeout=30
)

How Cancellation Works

Cancellation in PyWorkflow is checkpoint-based. The workflow is cancelled at the next checkpoint, not immediately.

Cancellation Checkpoints

Cancellation is checked at these points:
CheckpointWhen
Before each stepBefore @step decorated functions execute
Before sleepBefore await sleep() suspends the workflow
Before hookBefore await hook() suspends the workflow
┌─────────────────────────────────────────────────────────────────────┐
│ Workflow Execution with Cancellation Checkpoints                    │
│                                                                     │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    │
│   │ ✓ Check  │───▶│  Step 1  │───▶│ ✓ Check  │───▶│  Step 2  │    │
│   └──────────┘    └──────────┘    └──────────┘    └──────────┘    │
│                                                         │           │
│                                                         ▼           │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    │
│   │  Step 3  │◀───│ ✓ Check  │◀───│  sleep() │◀───│ ✓ Check  │    │
│   └──────────┘    └──────────┘    └──────────┘    └──────────┘    │
│                                                                     │
│   ✓ Check = Cancellation checkpoint                                │
└─────────────────────────────────────────────────────────────────────┘
Important: Cancellation does NOT interrupt a step that is already executing.If a step takes a long time (e.g., a 10-minute API call), the workflow will only detect cancellation after that step completes. This is by design to avoid leaving operations in an inconsistent state.

Cooperative Cancellation for Long-Running Steps

For steps that run for a long time, you can add cooperative cancellation checks:
from pyworkflow import step, get_context

@step()
async def process_large_dataset(dataset_id: str):
    ctx = get_context()
    dataset = await load_dataset(dataset_id)

    results = []
    for chunk in dataset.chunks():
        # Check for cancellation periodically
        ctx.check_cancellation()

        result = await process_chunk(chunk)
        results.append(result)

    return results
This allows the step to respond to cancellation requests between chunks rather than waiting until the entire dataset is processed.

Handling Cancellation

Workflows can catch CancellationError to perform cleanup before terminating:
from pyworkflow import workflow, step, CancellationError, shield

@workflow()
async def order_workflow(order_id: str):
    try:
        await reserve_inventory(order_id)
        await charge_payment(order_id)
        await create_shipment(order_id)
        return {"status": "completed"}

    except CancellationError:
        # Cleanup on cancellation
        async with shield():
            # This cleanup will complete even if cancelled
            await release_inventory(order_id)
            await refund_payment(order_id)
        raise  # Re-raise to mark workflow as cancelled

The shield() Context Manager

Use shield() to protect critical code from cancellation:
from pyworkflow import shield

async with shield():
    # This code will complete even if cancellation is requested
    await commit_transaction()
    await send_confirmation_email()
While inside a shield() block:
  • ctx.check_cancellation() will not raise CancellationError
  • The cancellation request is preserved
  • Cancellation will take effect after exiting the shield
Don’t use shield() for long-running operations as it defeats the purpose of graceful cancellation.

Workflow States

When a workflow is cancelled, its status transitions to CANCELLED:
┌─────────────┐
│   RUNNING   │
└──────┬──────┘

       │  cancel_workflow()

┌─────────────┐
│  CANCELLED  │
└─────────────┘

┌─────────────┐
│  SUSPENDED  │  (sleeping or waiting for hook)
└──────┬──────┘

       │  cancel_workflow()

┌─────────────┐
│  CANCELLED  │  (immediate, no resume needed)
└─────────────┘
Workflow StateCancellation Behavior
RUNNINGSets cancellation flag; workflow cancelled at next checkpoint
SUSPENDEDImmediately marked as CANCELLED; scheduled resume task is abandoned
COMPLETEDCannot be cancelled (returns False)
FAILEDCannot be cancelled (returns False)
CANCELLEDAlready cancelled (returns False)

Monitoring Cancelled Workflows

Use the CLI to view cancelled workflows:
# List cancelled workflows
pyworkflow runs list --status cancelled

# View details of a cancelled workflow
pyworkflow runs status run_abc123

# View event log including cancellation events
pyworkflow runs logs run_abc123
Example output:
$ pyworkflow runs logs run_abc123 --filter cancellation

1
   Type: cancellation.requested
   Timestamp: 10:30:45.123
   Data: {
     "reason": "User requested cancellation",
     "requested_by": "admin"
   }

2
   Type: workflow.cancelled
   Timestamp: 10:30:45.456
   Data: {
     "reason": "User requested cancellation",
     "cleanup_completed": true
   }

Best Practices

If your workflow allocates resources or makes changes that need to be reversed, catch CancellationError and clean up:
@workflow()
async def managed_workflow():
    resource = await acquire_resource()
    try:
        await use_resource(resource)
    except CancellationError:
        await release_resource(resource)
        raise
Only use shield() for truly critical operations like database commits or compensation logic. Long-running shielded operations delay cancellation.
# Good: Short critical operation
async with shield():
    await db.commit()

# Bad: Long operation in shield
async with shield():
    await process_million_records()  # Defeats cancellation
For steps that process large amounts of data, add periodic cancellation checks:
@step()
async def batch_process(items: list):
    ctx = get_context()
    for i, item in enumerate(items):
        if i % 100 == 0:  # Check every 100 items
            ctx.check_cancellation()
        await process_item(item)
Include a reason when cancelling for better debugging and audit trails:
await cancel_workflow(
    run_id,
    reason="Customer cancelled order #12345"
)

API Reference

cancel_workflow()

async def cancel_workflow(
    run_id: str,
    reason: Optional[str] = None,
    wait: bool = False,
    timeout: Optional[float] = None,
    storage: Optional[StorageBackend] = None,
) -> bool
ParameterTypeDefaultDescription
run_idstrrequiredWorkflow run ID to cancel
reasonstrNoneOptional reason for cancellation
waitboolFalseWait for workflow to reach terminal state
timeoutfloatNoneTimeout in seconds when waiting
storageStorageBackendNoneStorage backend (uses configured default)
Returns: True if cancellation was initiated, False if workflow is already in a terminal state.

CancellationError

class CancellationError(WorkflowError):
    message: str   # Description of the cancellation
    reason: str    # Optional reason (e.g., "user_requested")

shield()

@asynccontextmanager
async def shield() -> AsyncIterator[None]
Context manager that prevents cancellation checks from raising within its scope.

Next Steps