Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.pyworkflow.dev/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Long-running workflows can accumulate large event histories that impact performance. continue_as_new() solves this by completing the current workflow and immediately starting a fresh execution with clean event history.

Fresh Event History

Each continuation starts with a clean event log.

Chain Tracking

Workflow runs are linked via continued_from_run_id and continued_to_run_id.

State Preservation

Pass state to the new execution via arguments.

Unlimited Duration

Run workflows indefinitely without unbounded history growth.

When to Use Continue-As-New

continue_as_new() is ideal for:
Use CaseDescription
Polling WorkflowsContinuously poll for updates without accumulating events
Batch ProcessingProcess large datasets in chunks, resetting history between batches
Recurring TasksDaily/weekly reports or scheduled jobs that run indefinitely
Queue ConsumersProcess messages from a queue without history growth
Long-Running SyncSync data between systems over extended periods

Basic Usage

Call continue_as_new() with the arguments for the new execution:
from pyworkflow import workflow, step, continue_as_new

@step()
async def fetch_batch(offset: int, batch_size: int) -> list:
    """Fetch a batch of items to process."""
    items = await db.query(offset=offset, limit=batch_size)
    return items

@step()
async def process_item(item: dict) -> dict:
    """Process a single item."""
    return await transform(item)

@workflow()
async def batch_processor(offset: int = 0, batch_size: int = 100) -> str:
    """Process items in batches with fresh event history."""
    items = await fetch_batch(offset, batch_size)

    if not items:
        # No more items - workflow complete
        return f"Processed {offset} total items"

    # Process this batch
    for item in items:
        await process_item(item)

    # Continue with next batch (fresh event history)
    continue_as_new(offset=offset + batch_size, batch_size=batch_size)
continue_as_new() never returns - it raises an internal signal that the executor catches. Any code after it will not execute.

How It Works

When continue_as_new() is called:
┌─────────────────────────────────────────────────────────────────────┐
│                     Continue-As-New Flow                             │
│                                                                      │
│   ┌──────────────┐         ┌──────────────┐         ┌──────────────┐│
│   │   Run #1     │────────▶│   Run #2     │────────▶│   Run #3     ││
│   │  offset=0    │         │  offset=100  │         │  offset=200  ││
│   │  ───────     │         │  ───────     │         │  ───────     ││
│   │  10 events   │         │  10 events   │         │  5 events    ││
│   │  CONTINUED   │         │  CONTINUED   │         │  COMPLETED   ││
│   └──────────────┘         └──────────────┘         └──────────────┘│
│         │                        │                        │         │
│         └────────────────────────┴────────────────────────┘         │
│                           Workflow Chain                             │
└─────────────────────────────────────────────────────────────────────┘
  1. Current run is marked as CONTINUED_AS_NEW
  2. A WORKFLOW_CONTINUED_AS_NEW event is recorded
  3. A new run is created with continued_from_run_id set
  4. The new run starts executing with the provided arguments
  5. New run has fresh, empty event history

Patterns

Polling Workflow

@workflow()
async def polling_workflow(cursor: str | None = None, poll_count: int = 0):
    """Poll for updates indefinitely."""
    # Check for new data
    new_cursor, updates = await check_for_updates(cursor)

    if updates:
        for update in updates:
            await process_update(update)

    if new_cursor is None:
        return f"Polling complete after {poll_count + 1} polls"

    # Continue polling with new cursor
    continue_as_new(cursor=new_cursor, poll_count=poll_count + 1)

Recurring Task with Sleep

@workflow()
async def daily_report(day: int = 1):
    """Generate daily reports indefinitely."""
    await generate_report(day)

    # Wait until next day
    await sleep("24h")

    # Continue with next day (fresh history)
    continue_as_new(day=day + 1)

Bounded Iterations

@workflow()
async def bounded_workflow(iteration: int = 1, max_iterations: int = 10):
    """Run for a fixed number of iterations."""
    await do_work(iteration)

    if iteration >= max_iterations:
        return f"Completed {max_iterations} iterations"

    continue_as_new(iteration=iteration + 1, max_iterations=max_iterations)

Tracking Workflow Chains

Use get_workflow_chain() to retrieve all runs in a continuation chain:
from pyworkflow import get_workflow_chain

# Get the full chain from any run in it
chain = await get_workflow_chain("run_abc123")

for run in chain:
    print(f"{run.run_id}: {run.status.value}")
    print(f"  From: {run.continued_from_run_id}")
    print(f"  To: {run.continued_to_run_id}")

Workflow Run Schema

The WorkflowRun schema includes continuation tracking fields:
FieldTypeDescription
continued_from_run_idstr | NoneRun ID this execution continued from
continued_to_run_idstr | NoneRun ID this execution continued to

Important Behaviors

Arguments Are Required

continue_as_new() requires at least one argument:
# Valid - explicit arguments
continue_as_new(offset=100)
continue_as_new(cursor="abc123", count=5)

# Invalid - will raise ValueError
continue_as_new()  # No arguments provided
Unlike some workflow systems, PyWorkflow does not automatically use the original arguments. You must explicitly pass all arguments needed for the next execution.

Child Workflows Are Cancelled

When a parent workflow continues as new, all running child workflows are cancelled:
@workflow()
async def parent_workflow(iteration: int = 1):
    # Start child workflow
    handle = await start_child_workflow(child_workflow, "data")

    # When we continue as new, the child is cancelled
    continue_as_new(iteration=iteration + 1)

Cancellation Takes Precedence

If a workflow is cancelled, continue_as_new() will raise CancellationError instead:
@workflow()
async def my_workflow(count: int):
    # If cancellation was requested, this raises CancellationError
    # not ContinueAsNewSignal
    continue_as_new(count=count + 1)

Status is Terminal

CONTINUED_AS_NEW is a terminal status like COMPLETED or FAILED:
# Cannot cancel a workflow that already continued
result = await cancel_workflow("run_that_continued")
# Returns False

Events

The continuation is recorded as a WORKFLOW_CONTINUED_AS_NEW event:
{
  "type": "workflow.continued_as_new",
  "run_id": "run_abc123",
  "timestamp": "2025-01-15T10:30:00Z",
  "data": {
    "new_run_id": "run_def456",
    "args": "[100]",
    "kwargs": "{\"batch_size\": 50}",
    "reason": null
  }
}
View continuation events with the CLI:
pyworkflow runs logs run_abc123 --filter continued

Best Practices

Any workflow that could run indefinitely (polling, queues, recurring tasks) should use continue_as_new() to prevent unbounded event history growth.
@workflow()
async def message_consumer():
    while True:  # Don't do this!
        msg = await get_next_message()
        await process_message(msg)

# Better - use continue_as_new
@workflow()
async def message_consumer(messages_processed: int = 0):
    msg = await get_next_message()
    if msg:
        await process_message(msg)
        continue_as_new(messages_processed=messages_processed + 1)
    return f"Processed {messages_processed} messages"
Only pass the state needed for the next execution. Large payloads increase storage and serialization costs.
# Good - minimal state
continue_as_new(cursor="abc123", processed_count=1000)

# Bad - passing large data
continue_as_new(all_results=huge_list_of_results)
Include counters or timestamps to track overall progress across the chain:
@workflow()
async def sync_workflow(
    cursor: str | None = None,
    total_synced: int = 0,
    started_at: str | None = None
):
    if started_at is None:
        started_at = datetime.now().isoformat()

    items, new_cursor = await fetch_items(cursor)
    await sync_items(items)

    if new_cursor:
        continue_as_new(
            cursor=new_cursor,
            total_synced=total_synced + len(items),
            started_at=started_at
        )

    return {
        "total_synced": total_synced + len(items),
        "started_at": started_at,
        "completed_at": datetime.now().isoformat()
    }
Always have a termination condition that returns normally:
@workflow()
async def batch_workflow(offset: int = 0):
    items = await fetch_items(offset)

    if not items:
        # Terminal condition - return result
        return {"processed": offset, "status": "complete"}

    await process_items(items)
    continue_as_new(offset=offset + len(items))

API Reference

continue_as_new()

def continue_as_new(*args: Any, **kwargs: Any) -> NoReturn
Complete the current workflow and start a new execution with fresh event history.
ParameterTypeDescription
*argsAnyPositional arguments for the new execution
**kwargsAnyKeyword arguments for the new execution
Raises:
  • ContinueAsNewSignal - Internal signal caught by the executor
  • ValueError - If no arguments are provided
  • RuntimeError - If called outside a workflow context
  • CancellationError - If workflow is being cancelled

get_workflow_chain()

async def get_workflow_chain(
    run_id: str,
    storage: StorageBackend | None = None,
) -> list[WorkflowRun]
Get all workflow runs in a continuation chain.
ParameterTypeDefaultDescription
run_idstrrequiredAny run ID in the chain
storageStorageBackendNoneStorage backend (uses configured default)
Returns: List of WorkflowRun objects ordered from first to last in the chain.

Next Steps

Sleep

Learn about durable sleep for delays.

Hooks

Wait for external events in your workflows.

Fault Tolerance

Automatic recovery from worker crashes.

CLI Guide

Manage workflows from the command line.