Schedules allow you to automatically trigger workflow executions at specified times or intervals. Instead of manually starting workflows, you can configure them to run:
On a cron schedule: Run at specific times (e.g., every day at 9 AM)
At regular intervals: Run repeatedly with a fixed delay (e.g., every 5 minutes)
On calendar dates: Run on specific days of the month or week
Copy
from pyworkflow import scheduled_workflow, OverlapPolicy@scheduled_workflow(cron="0 9 * * *")async def daily_report(): """Runs every day at 9 AM""" data = await gather_metrics() await generate_report(data) return {"status": "report_generated"}
@scheduled_workflow(interval="5m") # Every 5 minutes@scheduled_workflow(interval="1h") # Every hour@scheduled_workflow(interval="30s") # Every 30 seconds@scheduled_workflow(interval="1d") # Every day
Supported units:
s - seconds
m - minutes
h - hours
d - days
When using intervals, the first run happens immediately when the schedule is created. Subsequent runs occur at the specified interval after each run completes.
For more complex scheduling needs, use calendar specifications:
Copy
from pyworkflow import ScheduleSpec, CalendarSpec# Run on the 1st and 15th of every month at midnightspec = ScheduleSpec( calendar=[ CalendarSpec(day_of_month=1, hour=0, minute=0), CalendarSpec(day_of_month=15, hour=0, minute=0), ])
Use when it’s safe to miss a run. The next scheduled run will catch up.Example: Metrics collection, status checks
Copy
@scheduled_workflow(interval="1m", overlap_policy=OverlapPolicy.SKIP)async def collect_metrics(): # Missing one collection is fine await record_current_metrics()
BUFFER_ONE - Ensure at least one catchup run
Use when you need to guarantee at least one run happens after a long-running execution.Example: Data synchronization
Copy
@scheduled_workflow(interval="5m", overlap_policy=OverlapPolicy.BUFFER_ONE)async def sync_data(): # If sync takes > 5 min, one more will run after await sync_all_records()
CANCEL_OTHER - Latest data wins
Use when only the most recent run matters and older runs should be cancelled.Example: Cache refresh
Copy
@scheduled_workflow(interval="10m", overlap_policy=OverlapPolicy.CANCEL_OTHER)async def refresh_cache(): # Cancel old refresh, use latest data await rebuild_cache()
ALLOW_ALL - Parallel processing
Use when runs are independent and can execute concurrently.Example: Processing independent queues
Copy
@scheduled_workflow(interval="1m", overlap_policy=OverlapPolicy.ALLOW_ALL)async def process_queue(): # Multiple workers can process simultaneously await process_next_batch()
from pyworkflow import list_schedules, ScheduleStatus# List all active schedulesschedules = await list_schedules(status=ScheduleStatus.ACTIVE)for s in schedules: print(f"{s.schedule_id}: {s.workflow_name}") print(f" Next run: {s.next_run_time}") print(f" Runs: {s.successful_runs}/{s.total_runs}")# List schedules for a specific workflowschedules = await list_schedules(workflow_name="daily_report")
If the scheduler was down and missed some runs, you can backfill them:
Copy
from pyworkflow import backfill_schedulefrom datetime import datetime, UTC# Backfill runs that should have occurred during downtimerun_ids = await backfill_schedule( schedule_id="hourly_sync_schedule", start_time=datetime(2024, 1, 15, 0, 0, 0, tzinfo=UTC), end_time=datetime(2024, 1, 15, 12, 0, 0, tzinfo=UTC),)print(f"Created {len(run_ids)} backfill runs")
Backfill creates runs for all scheduled times in the range. For high-frequency schedules, this could create many runs. Consider the overlap_policy when backfilling.
PyWorkflow supports two runtimes for schedule execution:
Local Runtime
Celery Runtime (Distributed)
For development, testing, or single-process deployments, use the local scheduler:
Copy
# Start the local schedulerpyworkflow scheduler run# With custom poll intervalpyworkflow scheduler run --poll-interval 10# With a specific modulepyworkflow --module myapp.workflows scheduler run# Run for a specific duration (useful for testing)pyworkflow scheduler run --duration 60
The local scheduler polls storage for due schedules and triggers workflows in-process.
For production and distributed execution, use Celery Beat:
Copy
# Start Celery Beat with PyWorkflow schedulercelery -A pyworkflow.celery.app beat \ --scheduler pyworkflow.celery.scheduler:PyWorkflowScheduler \ --loglevel INFO
Or use the CLI:
Copy
# Start a worker with beat schedulerpyworkflow worker run --beat
Celery Beat dispatches workflows to distributed workers for parallel execution.
Both schedulers:
Poll storage for due schedules (every 5 seconds by default)
Trigger workflow execution for due schedules
Update next_run_time after each run
Handle overlap policies automatically
The schedule primitives (trigger_schedule, backfill_schedule, etc.) are runtime-agnostic
and will use whichever runtime is configured. You can switch between local and Celery
without changing your schedule management code.
This example shows a workflow that runs every minute using Celery workers for distributed execution.1. Define the scheduled workflow (myapp/workflows.py):
Copy
from pyworkflow import scheduled_workflow, step, OverlapPolicyfrom datetime import datetime, UTC@scheduled_workflow( cron="* * * * *", # Every minute overlap_policy=OverlapPolicy.SKIP, # Skip if previous run still active)async def minute_health_check(): """Runs every minute to check system health.""" result = await check_services() await record_metrics(result) return {"timestamp": datetime.now(UTC).isoformat(), "status": result}@step()async def check_services(): # Check various services return {"api": "healthy", "db": "healthy", "cache": "healthy"}@step()async def record_metrics(health_status: dict): # Record metrics to monitoring system print(f"Health check: {health_status}")
2. Create a startup script (myapp/main.py):
Copy
import asynciofrom pyworkflow import configure, activate_scheduled_workflowsfrom pyworkflow.storage.file import FileStorageBackendasync def setup_schedules(): # Configure PyWorkflow with file storage storage = FileStorageBackend(base_path="./workflow_data") configure( storage=storage, default_durable=True, ) # Activate all @scheduled_workflow decorators # This creates schedule records in storage schedule_ids = await activate_scheduled_workflows(storage=storage) print(f"Activated {len(schedule_ids)} schedule(s)")if __name__ == "__main__": asyncio.run(setup_schedules())
3. Start the services:
Copy
# Terminal 1: Run the setup script to activate schedulespython myapp/main.py# Terminal 2: Start Redis (required for Celery)docker run -d -p 6379:6379 redis:7-alpine# Terminal 3: Start Celery worker for workflow executionpyworkflow --module myapp.workflows worker run# Terminal 4: Start Celery Beat schedulercelery -A pyworkflow.celery.app beat \ --scheduler pyworkflow.celery.scheduler:PyWorkflowScheduler \ --loglevel INFO
The scheduler will now trigger minute_health_check every minute, and the Celery worker will execute it.
For testing or simple use cases, you can run schedules locally without Celery.1. Define the workflow (local_schedule.py):
Copy
import asynciofrom datetime import datetime, UTCfrom pyworkflow import ( configure, workflow, step, create_schedule, ScheduleSpec, OverlapPolicy, start,)from pyworkflow.storage.file import FileStorageBackendfrom pyworkflow.utils.schedule import calculate_next_run_time# Define a simple workflow@workflow()async def minute_task(): """A task that runs every minute.""" result = await do_work() print(f"[{datetime.now().strftime('%H:%M:%S')}] Task completed: {result}") return result@step()async def do_work(): return {"processed_at": datetime.now(UTC).isoformat()}async def run_local_scheduler(): """Simple local scheduler loop.""" # Configure with file storage storage = FileStorageBackend(base_path="./workflow_data") configure( storage=storage, default_durable=True, default_runtime="local", # Run in-process, no Celery ) # Create a schedule for our workflow spec = ScheduleSpec(cron="* * * * *") # Every minute schedule = await create_schedule( workflow_name="minute_task", spec=spec, overlap_policy=OverlapPolicy.SKIP, schedule_id="local_minute_schedule", storage=storage, ) print(f"Created schedule: {schedule.schedule_id}") print(f"First run at: {schedule.next_run_time}") # Simple scheduler loop while True: now = datetime.now(UTC) # Check if schedule is due due_schedules = await storage.get_due_schedules(now) for sched in due_schedules: print(f"\n[{now.strftime('%H:%M:%S')}] Triggering: {sched.workflow_name}") # Start the workflow locally run_id = await start(minute_task) print(f"Started run: {run_id}") # Update next run time sched.next_run_time = calculate_next_run_time(sched.spec, now=now) sched.total_runs += 1 await storage.update_schedule(sched) # Check every 5 seconds await asyncio.sleep(5)if __name__ == "__main__": print("Starting local scheduler (Ctrl+C to stop)...") asyncio.run(run_local_scheduler())
2. Run it:
Copy
python local_schedule.py
Output:
Copy
Created schedule: local_minute_scheduleFirst run at: 2024-01-15 10:01:00+00:00Starting local scheduler (Ctrl+C to stop)...[10:01:00] Triggering: minute_taskStarted run: run_abc123...[10:01:00] Task completed: {'processed_at': '2024-01-15T10:01:00.123456+00:00'}[10:02:00] Triggering: minute_taskStarted run: run_def456...[10:02:00] Task completed: {'processed_at': '2024-01-15T10:02:00.234567+00:00'}
The local runtime is great for development and testing. For production, use the Celery-based approach with pyworkflow worker run --beat for robust, distributed schedule execution.