Skip to main content

What is Sleep?

The sleep() primitive pauses a workflow for a specified duration. Unlike traditional sleep that blocks a thread, PyWorkflow’s sleep suspends the workflow completely - no resources are consumed during the sleep period.
from pyworkflow import workflow, sleep

@workflow()
async def reminder_sequence(user_id: str):
    await send_reminder(user_id, "First reminder")

    # Workflow suspends here - zero resources used
    await sleep("1d")

    # Resumes automatically after 1 day
    await send_reminder(user_id, "Second reminder")

    await sleep("7d")

    await send_reminder(user_id, "Final reminder")

How It Works

Workflow Execution


┌───────────────┐
│ Execute Steps │
└───────┬───────┘


┌───────────────┐
│ sleep("1d")   │
└───────┬───────┘

        ├─── 1. Record sleep_started event

        ├─── 2. Schedule Celery Beat task for wake time

        ├─── 3. Raise SuspensionSignal

        └─── 4. Worker is freed

                  │  ... 1 day passes ...


        ┌─────────────────┐
        │ Celery Beat     │
        │ triggers resume │
        └────────┬────────┘


        ┌───────────────┐
        │ Replay events │
        │ Resume work   │
        └───────────────┘

Duration Formats

await sleep("30s")   # 30 seconds
await sleep("5m")    # 5 minutes
await sleep("2h")    # 2 hours
await sleep("1d")    # 1 day
await sleep("1w")    # 1 week

# Combined
await sleep("1h30m") # 1 hour 30 minutes
await sleep("2d12h") # 2 days 12 hours

Timedelta

from datetime import timedelta

await sleep(timedelta(hours=2, minutes=30))
await sleep(timedelta(days=7))

Until Specific Time

from datetime import datetime

# Sleep until a specific datetime
await sleep(datetime(2025, 12, 25, 9, 0, 0))

# Sleep until next Monday at 9 AM
next_monday = get_next_monday()
await sleep(next_monday.replace(hour=9, minute=0))

Integer (Seconds)

await sleep(300)  # 300 seconds (5 minutes)

Zero-Resource Suspension

Traditional async sleep blocks a worker:
# BAD: This holds a worker for 24 hours!
import asyncio

async def traditional_sleep():
    await do_something()
    await asyncio.sleep(86400)  # Blocks worker for 24h
    await do_something_else()
PyWorkflow’s sleep releases the worker:
# GOOD: Worker is freed during sleep
from pyworkflow import sleep

@workflow()
async def efficient_sleep():
    await do_something()
    await sleep("24h")  # Worker freed, resumes later
    await do_something_else()
With 100 workflows each sleeping for 1 day, traditional sleep would need 100 workers blocked for 24 hours. PyWorkflow needs 0 workers during the sleep period.

Use Cases

Scheduled Reminders

@workflow()
async def onboarding_drip(user_id: str):
    await send_email(user_id, "Welcome!")

    await sleep("1d")
    await send_email(user_id, "Getting started tips")

    await sleep("3d")
    await send_email(user_id, "Advanced features")

    await sleep("7d")
    await send_email(user_id, "How's it going?")

Delayed Processing

@workflow()
async def process_refund(order_id: str):
    await validate_refund_request(order_id)

    # Wait for potential fraud review
    await sleep("24h")

    # If not flagged, process refund
    await execute_refund(order_id)
    await notify_customer(order_id)

Rate Limiting

@workflow()
async def batch_api_calls(items: list):
    for i, item in enumerate(items):
        await call_api(item)

        # Rate limit: 10 calls per minute
        if (i + 1) % 10 == 0:
            await sleep("1m")

Retry with Backoff

@workflow()
async def resilient_operation():
    for attempt in range(5):
        try:
            result = await risky_step()
            return result
        except TemporaryError:
            if attempt < 4:
                # Exponential backoff: 1m, 2m, 4m, 8m
                delay = f"{2 ** attempt}m"
                await sleep(delay)

    raise FatalError("All retries exhausted")

Sleep vs Step Timeout

Sleep and timeouts serve different purposes:
FeatureSleepTimeout
PurposeIntentional delayMaximum execution time
ResourcesZero during sleepWorker is active
FailureNever failsFails if exceeded
@step(timeout="30s")  # Fails if step takes > 30s
async def quick_step():
    pass

@workflow()
async def my_workflow():
    await quick_step()
    await sleep("1h")  # Intentional 1-hour pause
    await quick_step()

Celery Beat Requirement

Sleep resumption requires Celery Beat to be running:
# Start Celery Beat for scheduled task execution
celery -A pyworkflow.celery.app beat --loglevel=info
Without Celery Beat, workflows will suspend but never resume automatically. Make sure Beat is running in production.

Docker Compose Setup

services:
  worker:
    command: celery -A pyworkflow.celery.app worker --loglevel=info

  beat:
    command: celery -A pyworkflow.celery.app beat --loglevel=info

Best Practices

Don’t use sleep as a retry mechanism. Use step retry configuration instead:
# Good: Use retry config
@step(max_retries=3, retry_delay="exponential")
async def my_step():
    pass

# Avoid: Manual retry with sleep
@workflow()
async def my_workflow():
    for i in range(3):
        try:
            await my_step()
            break
        except:
            await sleep(f"{2**i}m")
When sleeping until a specific time, be aware of timezones:
from datetime import datetime
import pytz

# Explicit timezone
eastern = pytz.timezone("US/Eastern")
wake_time = eastern.localize(datetime(2025, 1, 15, 9, 0))
await sleep(wake_time)
Very long sleeps (months, years) work but consider if a different approach is better:
# Works, but consider alternatives
await sleep("365d")

# Alternative: Schedule a new workflow
schedule_workflow(annual_review, run_at="2026-01-01")

Next Steps