Long-running workflows can accumulate large event histories that impact performance. continue_as_new() solves this by completing the current workflow and immediately starting a fresh execution with clean event history.
Fresh Event History
Each continuation starts with a clean event log.
Chain Tracking
Workflow runs are linked via continued_from_run_id and continued_to_run_id.
State Preservation
Pass state to the new execution via arguments.
Unlimited Duration
Run workflows indefinitely without unbounded history growth.
Call continue_as_new() with the arguments for the new execution:
Copy
from pyworkflow import workflow, step, continue_as_new@step()async def fetch_batch(offset: int, batch_size: int) -> list: """Fetch a batch of items to process.""" items = await db.query(offset=offset, limit=batch_size) return items@step()async def process_item(item: dict) -> dict: """Process a single item.""" return await transform(item)@workflow()async def batch_processor(offset: int = 0, batch_size: int = 100) -> str: """Process items in batches with fresh event history.""" items = await fetch_batch(offset, batch_size) if not items: # No more items - workflow complete return f"Processed {offset} total items" # Process this batch for item in items: await process_item(item) # Continue with next batch (fresh event history) continue_as_new(offset=offset + batch_size, batch_size=batch_size)
continue_as_new() never returns - it raises an internal signal that the executor catches. Any code after it will not execute.
@workflow()async def polling_workflow(cursor: str | None = None, poll_count: int = 0): """Poll for updates indefinitely.""" # Check for new data new_cursor, updates = await check_for_updates(cursor) if updates: for update in updates: await process_update(update) if new_cursor is None: return f"Polling complete after {poll_count + 1} polls" # Continue polling with new cursor continue_as_new(cursor=new_cursor, poll_count=poll_count + 1)
@workflow()async def daily_report(day: int = 1): """Generate daily reports indefinitely.""" await generate_report(day) # Wait until next day await sleep("24h") # Continue with next day (fresh history) continue_as_new(day=day + 1)
@workflow()async def bounded_workflow(iteration: int = 1, max_iterations: int = 10): """Run for a fixed number of iterations.""" await do_work(iteration) if iteration >= max_iterations: return f"Completed {max_iterations} iterations" continue_as_new(iteration=iteration + 1, max_iterations=max_iterations)
Use get_workflow_chain() to retrieve all runs in a continuation chain:
Python API
CLI
Copy
from pyworkflow import get_workflow_chain# Get the full chain from any run in itchain = await get_workflow_chain("run_abc123")for run in chain: print(f"{run.run_id}: {run.status.value}") print(f" From: {run.continued_from_run_id}") print(f" To: {run.continued_to_run_id}")
Copy
# View the continuation chainpyworkflow runs chain run_abc123# Output:# Continue-As-New Chain# ────────────────────────────────────────────────────────# Chain length: 3 run(s)## START# Run ID: run_abc123def456# Workflow: batch_processor# Status: continued_as_new# Duration: 2.3s## ↓ continued as new## #2# Run ID: run_789xyz123abc# Workflow: batch_processor# Status: continued_as_new# Duration: 1.8s## ↓ continued as new## CURRENT# Run ID: run_456def789xyz# Workflow: batch_processor# Status: completed# Duration: 0.5s
# Valid - explicit argumentscontinue_as_new(offset=100)continue_as_new(cursor="abc123", count=5)# Invalid - will raise ValueErrorcontinue_as_new() # No arguments provided
Unlike some workflow systems, PyWorkflow does not automatically use the original arguments. You must explicitly pass all arguments needed for the next execution.
When a parent workflow continues as new, all running child workflows are cancelled:
Copy
@workflow()async def parent_workflow(iteration: int = 1): # Start child workflow handle = await start_child_workflow(child_workflow, "data") # When we continue as new, the child is cancelled continue_as_new(iteration=iteration + 1)
If a workflow is cancelled, continue_as_new() will raise CancellationError instead:
Copy
@workflow()async def my_workflow(count: int): # If cancellation was requested, this raises CancellationError # not ContinueAsNewSignal continue_as_new(count=count + 1)
Any workflow that could run indefinitely (polling, queues, recurring tasks) should use continue_as_new() to prevent unbounded event history growth.
Copy
@workflow()async def message_consumer(): while True: # Don't do this! msg = await get_next_message() await process_message(msg)# Better - use continue_as_new@workflow()async def message_consumer(messages_processed: int = 0): msg = await get_next_message() if msg: await process_message(msg) continue_as_new(messages_processed=messages_processed + 1) return f"Processed {messages_processed} messages"
Pass minimal state
Only pass the state needed for the next execution. Large payloads increase storage and serialization costs.
Copy
# Good - minimal statecontinue_as_new(cursor="abc123", processed_count=1000)# Bad - passing large datacontinue_as_new(all_results=huge_list_of_results)
Include progress tracking
Include counters or timestamps to track overall progress across the chain: