Skip to main content

What are Schedules?

Schedules allow you to automatically trigger workflow executions at specified times or intervals. Instead of manually starting workflows, you can configure them to run:
  • On a cron schedule: Run at specific times (e.g., every day at 9 AM)
  • At regular intervals: Run repeatedly with a fixed delay (e.g., every 5 minutes)
  • On calendar dates: Run on specific days of the month or week
from pyworkflow import scheduled_workflow, OverlapPolicy

@scheduled_workflow(cron="0 9 * * *")
async def daily_report():
    """Runs every day at 9 AM"""
    data = await gather_metrics()
    await generate_report(data)
    return {"status": "report_generated"}

Key Features

Multiple Schedule Types

Choose from cron expressions, intervals, or calendar-based scheduling.

Overlap Policies

Control what happens when a new run is triggered while a previous run is still executing.

Dynamic Management

Create, pause, resume, and delete schedules at runtime without redeploying.

Backfill Support

Catch up on missed runs after downtime with backfill capabilities.

Creating Schedules

There are two ways to create schedules:

Using the Decorator

The @scheduled_workflow decorator combines workflow definition with schedule configuration:
from pyworkflow import scheduled_workflow

@scheduled_workflow(cron="0 9 * * *")  # Every day at 9 AM
async def daily_cleanup():
    await cleanup_old_records()
    return {"cleaned": True}

Using the API

For dynamic schedule creation, use the create_schedule function:
from pyworkflow import create_schedule, ScheduleSpec, OverlapPolicy

# Create a schedule for an existing workflow
schedule = await create_schedule(
    workflow_name="daily_report",
    spec=ScheduleSpec(cron="0 9 * * *"),
    overlap_policy=OverlapPolicy.SKIP,
    schedule_id="daily_report_schedule",  # Optional custom ID
)

print(f"Created schedule: {schedule.schedule_id}")
print(f"Next run: {schedule.next_run_time}")

Schedule Types

Cron Expressions

Cron expressions provide precise control over when workflows run. The format is:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, Sunday=0)
│ │ │ │ │
* * * * *
Common patterns:
ExpressionDescription
* * * * *Every minute
0 * * * *Every hour
0 9 * * *Every day at 9 AM
0 9 * * 1Every Monday at 9 AM
0 0 1 * *First day of every month at midnight
*/5 * * * *Every 5 minutes
0 */4 * * *Every 4 hours
0 9-17 * * 1-5Hourly from 9 AM to 5 PM, Monday to Friday

Intervals

Intervals specify a fixed duration between runs:
@scheduled_workflow(interval="5m")   # Every 5 minutes
@scheduled_workflow(interval="1h")   # Every hour
@scheduled_workflow(interval="30s")  # Every 30 seconds
@scheduled_workflow(interval="1d")   # Every day
Supported units:
  • s - seconds
  • m - minutes
  • h - hours
  • d - days
When using intervals, the first run happens immediately when the schedule is created. Subsequent runs occur at the specified interval after each run completes.

Calendar-Based Schedules

For more complex scheduling needs, use calendar specifications:
from pyworkflow import ScheduleSpec, CalendarSpec

# Run on the 1st and 15th of every month at midnight
spec = ScheduleSpec(
    calendar=[
        CalendarSpec(day_of_month=1, hour=0, minute=0),
        CalendarSpec(day_of_month=15, hour=0, minute=0),
    ]
)
CalendarSpec fields:
FieldTypeDescription
secondintSecond (0-59), default: 0
minuteintMinute (0-59), default: 0
hourintHour (0-23), default: 0
day_of_monthint | NoneDay of month (1-31)
monthint | NoneMonth (1-12)
day_of_weekint | NoneDay of week (0-6, Sunday=0)

Overlap Policies

When a schedule triggers while a previous run is still executing, the overlap policy determines what happens:
from pyworkflow import OverlapPolicy

@scheduled_workflow(
    interval="5m",
    overlap_policy=OverlapPolicy.SKIP
)
async def my_workflow():
    pass
PolicyDescription
SKIPDefault. Skip the new run if a previous run is still active.
BUFFER_ONEQueue one run to execute after the current run completes.
BUFFER_ALLQueue all triggered runs (use with caution).
CANCEL_OTHERCancel the running execution and start a new one.
ALLOW_ALLAllow multiple concurrent executions.
BUFFER_ALL can lead to unbounded queue growth if runs take longer than the schedule interval. Use SKIP or BUFFER_ONE for most use cases.

Choosing an Overlap Policy

Use when it’s safe to miss a run. The next scheduled run will catch up.Example: Metrics collection, status checks
@scheduled_workflow(interval="1m", overlap_policy=OverlapPolicy.SKIP)
async def collect_metrics():
    # Missing one collection is fine
    await record_current_metrics()
Use when you need to guarantee at least one run happens after a long-running execution.Example: Data synchronization
@scheduled_workflow(interval="5m", overlap_policy=OverlapPolicy.BUFFER_ONE)
async def sync_data():
    # If sync takes > 5 min, one more will run after
    await sync_all_records()
Use when only the most recent run matters and older runs should be cancelled.Example: Cache refresh
@scheduled_workflow(interval="10m", overlap_policy=OverlapPolicy.CANCEL_OTHER)
async def refresh_cache():
    # Cancel old refresh, use latest data
    await rebuild_cache()
Use when runs are independent and can execute concurrently.Example: Processing independent queues
@scheduled_workflow(interval="1m", overlap_policy=OverlapPolicy.ALLOW_ALL)
async def process_queue():
    # Multiple workers can process simultaneously
    await process_next_batch()

Managing Schedules

Pause and Resume

Temporarily stop a schedule without deleting it:
from pyworkflow import pause_schedule, resume_schedule

# Pause the schedule
await pause_schedule("daily_report_schedule")

# ... later, resume it
schedule = await resume_schedule("daily_report_schedule")
print(f"Resumed. Next run: {schedule.next_run_time}")

Update Schedule

Modify an existing schedule’s configuration:
from pyworkflow import update_schedule, ScheduleSpec, OverlapPolicy

# Change the schedule timing
await update_schedule(
    schedule_id="daily_report_schedule",
    spec=ScheduleSpec(cron="0 10 * * *"),  # Change to 10 AM
)

# Change the overlap policy
await update_schedule(
    schedule_id="daily_report_schedule",
    overlap_policy=OverlapPolicy.BUFFER_ONE,
)

Delete Schedule

Remove a schedule (soft delete - record is preserved for audit):
from pyworkflow import delete_schedule

await delete_schedule("daily_report_schedule")

List Schedules

Query existing schedules:
from pyworkflow import list_schedules, ScheduleStatus

# List all active schedules
schedules = await list_schedules(status=ScheduleStatus.ACTIVE)

for s in schedules:
    print(f"{s.schedule_id}: {s.workflow_name}")
    print(f"  Next run: {s.next_run_time}")
    print(f"  Runs: {s.successful_runs}/{s.total_runs}")

# List schedules for a specific workflow
schedules = await list_schedules(workflow_name="daily_report")

Manual Trigger and Backfill

Manual Trigger

Execute a scheduled workflow immediately, outside of its normal schedule:
from pyworkflow import trigger_schedule

# Trigger the schedule now (doesn't affect regular schedule)
await trigger_schedule("daily_report_schedule")
This is useful for:
  • Testing the workflow
  • Running on-demand when needed
  • Recovering from issues

Backfill Missed Runs

If the scheduler was down and missed some runs, you can backfill them:
from pyworkflow import backfill_schedule
from datetime import datetime, UTC

# Backfill runs that should have occurred during downtime
run_ids = await backfill_schedule(
    schedule_id="hourly_sync_schedule",
    start_time=datetime(2024, 1, 15, 0, 0, 0, tzinfo=UTC),
    end_time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=UTC),
)

print(f"Created {len(run_ids)} backfill runs")
Backfill creates runs for all scheduled times in the range. For high-frequency schedules, this could create many runs. Consider the overlap_policy when backfilling.

Timezone Support

Schedules support timezone-aware execution:
@scheduled_workflow(
    cron="0 9 * * *",
    timezone="America/New_York"  # 9 AM Eastern Time
)
async def east_coast_report():
    pass

@scheduled_workflow(
    cron="0 9 * * *",
    timezone="Europe/London"  # 9 AM British Time
)
async def uk_report():
    pass
All schedule times are stored internally as UTC. The timezone is used to calculate the correct UTC time for each run.

Time Bounds

Limit when a schedule is active:
from datetime import datetime, UTC

@scheduled_workflow(
    cron="0 9 * * *",
    start_at=datetime(2024, 1, 1, tzinfo=UTC),  # Start on Jan 1
    end_at=datetime(2024, 12, 31, tzinfo=UTC),  # End on Dec 31
)
async def annual_workflow():
    pass

Running the Scheduler

PyWorkflow supports two runtimes for schedule execution:
For development, testing, or single-process deployments, use the local scheduler:
# Start the local scheduler
pyworkflow scheduler run

# With custom poll interval
pyworkflow scheduler run --poll-interval 10

# With a specific module
pyworkflow --module myapp.workflows scheduler run

# Run for a specific duration (useful for testing)
pyworkflow scheduler run --duration 60
The local scheduler polls storage for due schedules and triggers workflows in-process.
Both schedulers:
  1. Poll storage for due schedules (every 5 seconds by default)
  2. Trigger workflow execution for due schedules
  3. Update next_run_time after each run
  4. Handle overlap policies automatically
The schedule primitives (trigger_schedule, backfill_schedule, etc.) are runtime-agnostic and will use whichever runtime is configured. You can switch between local and Celery without changing your schedule management code.

Activating Decorator-Based Schedules

When using @scheduled_workflow, the schedules need to be activated to create records in storage:
from pyworkflow import activate_scheduled_workflows

# Activate all @scheduled_workflow decorated functions
schedule_ids = await activate_scheduled_workflows()
print(f"Activated {len(schedule_ids)} schedules")
Call this during application startup to ensure all decorated workflows have corresponding schedule records.

Complete Examples

Every-Minute Schedule with Celery (Distributed)

This example shows a workflow that runs every minute using Celery workers for distributed execution. 1. Define the scheduled workflow (myapp/workflows.py):
from pyworkflow import scheduled_workflow, step, OverlapPolicy
from datetime import datetime, UTC

@scheduled_workflow(
    cron="* * * * *",  # Every minute
    overlap_policy=OverlapPolicy.SKIP,  # Skip if previous run still active
)
async def minute_health_check():
    """Runs every minute to check system health."""
    result = await check_services()
    await record_metrics(result)
    return {"timestamp": datetime.now(UTC).isoformat(), "status": result}

@step()
async def check_services():
    # Check various services
    return {"api": "healthy", "db": "healthy", "cache": "healthy"}

@step()
async def record_metrics(health_status: dict):
    # Record metrics to monitoring system
    print(f"Health check: {health_status}")
2. Create a startup script (myapp/main.py):
import asyncio
from pyworkflow import configure, activate_scheduled_workflows
from pyworkflow.storage.file import FileStorageBackend

async def setup_schedules():
    # Configure PyWorkflow with file storage
    storage = FileStorageBackend(base_path="./workflow_data")
    configure(
        storage=storage,
        default_durable=True,
    )

    # Activate all @scheduled_workflow decorators
    # This creates schedule records in storage
    schedule_ids = await activate_scheduled_workflows(storage=storage)
    print(f"Activated {len(schedule_ids)} schedule(s)")

if __name__ == "__main__":
    asyncio.run(setup_schedules())
3. Start the services:
# Terminal 1: Run the setup script to activate schedules
python myapp/main.py

# Terminal 2: Start Redis (required for Celery)
docker run -d -p 6379:6379 redis:7-alpine

# Terminal 3: Start Celery worker for workflow execution
pyworkflow --module myapp.workflows worker run

# Terminal 4: Start Celery Beat scheduler
celery -A pyworkflow.celery.app beat \
    --scheduler pyworkflow.celery.scheduler:PyWorkflowScheduler \
    --loglevel INFO
The scheduler will now trigger minute_health_check every minute, and the Celery worker will execute it.

Every-Minute Schedule with Local Runtime

For testing or simple use cases, you can run schedules locally without Celery. 1. Define the workflow (local_schedule.py):
import asyncio
from datetime import datetime, UTC

from pyworkflow import (
    configure,
    workflow,
    step,
    create_schedule,
    ScheduleSpec,
    OverlapPolicy,
    start,
)
from pyworkflow.storage.file import FileStorageBackend
from pyworkflow.utils.schedule import calculate_next_run_time

# Define a simple workflow
@workflow()
async def minute_task():
    """A task that runs every minute."""
    result = await do_work()
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Task completed: {result}")
    return result

@step()
async def do_work():
    return {"processed_at": datetime.now(UTC).isoformat()}


async def run_local_scheduler():
    """Simple local scheduler loop."""
    # Configure with file storage
    storage = FileStorageBackend(base_path="./workflow_data")
    configure(
        storage=storage,
        default_durable=True,
        default_runtime="local",  # Run in-process, no Celery
    )

    # Create a schedule for our workflow
    spec = ScheduleSpec(cron="* * * * *")  # Every minute

    schedule = await create_schedule(
        workflow_name="minute_task",
        spec=spec,
        overlap_policy=OverlapPolicy.SKIP,
        schedule_id="local_minute_schedule",
        storage=storage,
    )
    print(f"Created schedule: {schedule.schedule_id}")
    print(f"First run at: {schedule.next_run_time}")

    # Simple scheduler loop
    while True:
        now = datetime.now(UTC)

        # Check if schedule is due
        due_schedules = await storage.get_due_schedules(now)

        for sched in due_schedules:
            print(f"\n[{now.strftime('%H:%M:%S')}] Triggering: {sched.workflow_name}")

            # Start the workflow locally
            run_id = await start(minute_task)
            print(f"Started run: {run_id}")

            # Update next run time
            sched.next_run_time = calculate_next_run_time(sched.spec, now=now)
            sched.total_runs += 1
            await storage.update_schedule(sched)

        # Check every 5 seconds
        await asyncio.sleep(5)


if __name__ == "__main__":
    print("Starting local scheduler (Ctrl+C to stop)...")
    asyncio.run(run_local_scheduler())
2. Run it:
python local_schedule.py
Output:
Created schedule: local_minute_schedule
First run at: 2024-01-15 10:01:00+00:00
Starting local scheduler (Ctrl+C to stop)...

[10:01:00] Triggering: minute_task
Started run: run_abc123...
[10:01:00] Task completed: {'processed_at': '2024-01-15T10:01:00.123456+00:00'}

[10:02:00] Triggering: minute_task
Started run: run_def456...
[10:02:00] Task completed: {'processed_at': '2024-01-15T10:02:00.234567+00:00'}
The local runtime is great for development and testing. For production, use the Celery-based approach with pyworkflow worker run --beat for robust, distributed schedule execution.

CLI Commands

Manage schedules from the command line:
# List schedules
pyworkflow schedules list
pyworkflow schedules list --workflow daily_report --status active

# Show schedule details
pyworkflow schedules show sched_abc123

# Create a schedule
pyworkflow schedules create my_workflow --cron "0 9 * * *"
pyworkflow schedules create my_workflow --interval 5m --overlap skip

# Pause and resume
pyworkflow schedules pause sched_abc123
pyworkflow schedules resume sched_abc123

# Manual trigger
pyworkflow schedules trigger sched_abc123

# Backfill
pyworkflow schedules backfill sched_abc123 \
    --start 2024-01-01T00:00:00 \
    --end 2024-01-31T23:59:59

# Delete
pyworkflow schedules delete sched_abc123

Best Practices

The SKIP overlap policy is the safest default. It prevents resource exhaustion and ensures predictable behavior.
Don’t schedule too frequently. Consider the typical workflow duration when setting intervals to avoid constant overlaps.
Scheduled workflows should handle transient failures with retries. Use step-level retries for resilience.
Track successful_runs, failed_runs, and skipped_runs to identify issues with your schedules.
Design scheduled workflows to be idempotent. They may run multiple times due to retries or backfills.
Be explicit about timezones, especially for schedules that should run at specific local times.

Schedule Status

Schedules can be in one of three states:
StatusDescription
ACTIVESchedule is running and will trigger at next_run_time
PAUSEDSchedule is temporarily stopped, no runs will trigger
DELETEDSchedule has been soft-deleted

Monitoring

Track schedule execution with built-in statistics:
schedule = await get_schedule("my_schedule")

print(f"Total runs: {schedule.total_runs}")
print(f"Successful: {schedule.successful_runs}")
print(f"Failed: {schedule.failed_runs}")
print(f"Skipped: {schedule.skipped_runs}")
print(f"Last run: {schedule.last_run_at}")
print(f"Next run: {schedule.next_run_time}")

Next Steps