Skip to main content

Overview

Long-running workflows can accumulate large event histories that impact performance. continue_as_new() solves this by completing the current workflow and immediately starting a fresh execution with clean event history.

Fresh Event History

Each continuation starts with a clean event log.

Chain Tracking

Workflow runs are linked via continued_from_run_id and continued_to_run_id.

State Preservation

Pass state to the new execution via arguments.

Unlimited Duration

Run workflows indefinitely without unbounded history growth.

When to Use Continue-As-New

continue_as_new() is ideal for:
Use CaseDescription
Polling WorkflowsContinuously poll for updates without accumulating events
Batch ProcessingProcess large datasets in chunks, resetting history between batches
Recurring TasksDaily/weekly reports or scheduled jobs that run indefinitely
Queue ConsumersProcess messages from a queue without history growth
Long-Running SyncSync data between systems over extended periods

Basic Usage

Call continue_as_new() with the arguments for the new execution:
from pyworkflow import workflow, step, continue_as_new

@step()
async def fetch_batch(offset: int, batch_size: int) -> list:
    """Fetch a batch of items to process."""
    items = await db.query(offset=offset, limit=batch_size)
    return items

@step()
async def process_item(item: dict) -> dict:
    """Process a single item."""
    return await transform(item)

@workflow()
async def batch_processor(offset: int = 0, batch_size: int = 100) -> str:
    """Process items in batches with fresh event history."""
    items = await fetch_batch(offset, batch_size)

    if not items:
        # No more items - workflow complete
        return f"Processed {offset} total items"

    # Process this batch
    for item in items:
        await process_item(item)

    # Continue with next batch (fresh event history)
    continue_as_new(offset=offset + batch_size, batch_size=batch_size)
continue_as_new() never returns - it raises an internal signal that the executor catches. Any code after it will not execute.

How It Works

When continue_as_new() is called:
┌─────────────────────────────────────────────────────────────────────┐
│                     Continue-As-New Flow                             │
│                                                                      │
│   ┌──────────────┐         ┌──────────────┐         ┌──────────────┐│
│   │   Run #1     │────────▶│   Run #2     │────────▶│   Run #3     ││
│   │  offset=0    │         │  offset=100  │         │  offset=200  ││
│   │  ───────     │         │  ───────     │         │  ───────     ││
│   │  10 events   │         │  10 events   │         │  5 events    ││
│   │  CONTINUED   │         │  CONTINUED   │         │  COMPLETED   ││
│   └──────────────┘         └──────────────┘         └──────────────┘│
│         │                        │                        │         │
│         └────────────────────────┴────────────────────────┘         │
│                           Workflow Chain                             │
└─────────────────────────────────────────────────────────────────────┘
  1. Current run is marked as CONTINUED_AS_NEW
  2. A WORKFLOW_CONTINUED_AS_NEW event is recorded
  3. A new run is created with continued_from_run_id set
  4. The new run starts executing with the provided arguments
  5. New run has fresh, empty event history

Patterns

Polling Workflow

@workflow()
async def polling_workflow(cursor: str | None = None, poll_count: int = 0):
    """Poll for updates indefinitely."""
    # Check for new data
    new_cursor, updates = await check_for_updates(cursor)

    if updates:
        for update in updates:
            await process_update(update)

    if new_cursor is None:
        return f"Polling complete after {poll_count + 1} polls"

    # Continue polling with new cursor
    continue_as_new(cursor=new_cursor, poll_count=poll_count + 1)

Recurring Task with Sleep

@workflow()
async def daily_report(day: int = 1):
    """Generate daily reports indefinitely."""
    await generate_report(day)

    # Wait until next day
    await sleep("24h")

    # Continue with next day (fresh history)
    continue_as_new(day=day + 1)

Bounded Iterations

@workflow()
async def bounded_workflow(iteration: int = 1, max_iterations: int = 10):
    """Run for a fixed number of iterations."""
    await do_work(iteration)

    if iteration >= max_iterations:
        return f"Completed {max_iterations} iterations"

    continue_as_new(iteration=iteration + 1, max_iterations=max_iterations)

Tracking Workflow Chains

Use get_workflow_chain() to retrieve all runs in a continuation chain:
from pyworkflow import get_workflow_chain

# Get the full chain from any run in it
chain = await get_workflow_chain("run_abc123")

for run in chain:
    print(f"{run.run_id}: {run.status.value}")
    print(f"  From: {run.continued_from_run_id}")
    print(f"  To: {run.continued_to_run_id}")

Workflow Run Schema

The WorkflowRun schema includes continuation tracking fields:
FieldTypeDescription
continued_from_run_idstr | NoneRun ID this execution continued from
continued_to_run_idstr | NoneRun ID this execution continued to

Important Behaviors

Arguments Are Required

continue_as_new() requires at least one argument:
# Valid - explicit arguments
continue_as_new(offset=100)
continue_as_new(cursor="abc123", count=5)

# Invalid - will raise ValueError
continue_as_new()  # No arguments provided
Unlike some workflow systems, PyWorkflow does not automatically use the original arguments. You must explicitly pass all arguments needed for the next execution.

Child Workflows Are Cancelled

When a parent workflow continues as new, all running child workflows are cancelled:
@workflow()
async def parent_workflow(iteration: int = 1):
    # Start child workflow
    handle = await start_child_workflow(child_workflow, "data")

    # When we continue as new, the child is cancelled
    continue_as_new(iteration=iteration + 1)

Cancellation Takes Precedence

If a workflow is cancelled, continue_as_new() will raise CancellationError instead:
@workflow()
async def my_workflow(count: int):
    # If cancellation was requested, this raises CancellationError
    # not ContinueAsNewSignal
    continue_as_new(count=count + 1)

Status is Terminal

CONTINUED_AS_NEW is a terminal status like COMPLETED or FAILED:
# Cannot cancel a workflow that already continued
result = await cancel_workflow("run_that_continued")
# Returns False

Events

The continuation is recorded as a WORKFLOW_CONTINUED_AS_NEW event:
{
  "type": "workflow.continued_as_new",
  "run_id": "run_abc123",
  "timestamp": "2025-01-15T10:30:00Z",
  "data": {
    "new_run_id": "run_def456",
    "args": "[100]",
    "kwargs": "{\"batch_size\": 50}",
    "reason": null
  }
}
View continuation events with the CLI:
pyworkflow runs logs run_abc123 --filter continued

Best Practices

Any workflow that could run indefinitely (polling, queues, recurring tasks) should use continue_as_new() to prevent unbounded event history growth.
@workflow()
async def message_consumer():
    while True:  # Don't do this!
        msg = await get_next_message()
        await process_message(msg)

# Better - use continue_as_new
@workflow()
async def message_consumer(messages_processed: int = 0):
    msg = await get_next_message()
    if msg:
        await process_message(msg)
        continue_as_new(messages_processed=messages_processed + 1)
    return f"Processed {messages_processed} messages"
Only pass the state needed for the next execution. Large payloads increase storage and serialization costs.
# Good - minimal state
continue_as_new(cursor="abc123", processed_count=1000)

# Bad - passing large data
continue_as_new(all_results=huge_list_of_results)
Include counters or timestamps to track overall progress across the chain:
@workflow()
async def sync_workflow(
    cursor: str | None = None,
    total_synced: int = 0,
    started_at: str | None = None
):
    if started_at is None:
        started_at = datetime.now().isoformat()

    items, new_cursor = await fetch_items(cursor)
    await sync_items(items)

    if new_cursor:
        continue_as_new(
            cursor=new_cursor,
            total_synced=total_synced + len(items),
            started_at=started_at
        )

    return {
        "total_synced": total_synced + len(items),
        "started_at": started_at,
        "completed_at": datetime.now().isoformat()
    }
Always have a termination condition that returns normally:
@workflow()
async def batch_workflow(offset: int = 0):
    items = await fetch_items(offset)

    if not items:
        # Terminal condition - return result
        return {"processed": offset, "status": "complete"}

    await process_items(items)
    continue_as_new(offset=offset + len(items))

API Reference

continue_as_new()

def continue_as_new(*args: Any, **kwargs: Any) -> NoReturn
Complete the current workflow and start a new execution with fresh event history.
ParameterTypeDescription
*argsAnyPositional arguments for the new execution
**kwargsAnyKeyword arguments for the new execution
Raises:
  • ContinueAsNewSignal - Internal signal caught by the executor
  • ValueError - If no arguments are provided
  • RuntimeError - If called outside a workflow context
  • CancellationError - If workflow is being cancelled

get_workflow_chain()

async def get_workflow_chain(
    run_id: str,
    storage: StorageBackend | None = None,
) -> list[WorkflowRun]
Get all workflow runs in a continuation chain.
ParameterTypeDefaultDescription
run_idstrrequiredAny run ID in the chain
storageStorageBackendNoneStorage backend (uses configured default)
Returns: List of WorkflowRun objects ordered from first to last in the chain.

Next Steps