What is a Stream Workflow?
A stream workflow is a long-lived, reactive unit of orchestration built on top of the signal pub/sub system. Where a@workflow runs top-to-bottom and completes, a @stream_workflow defines a named channel (“stream”) that one or more @stream_steps subscribe to. Each step reacts to incoming signals, can suspend and be resumed by new signals, and can publish results back.
Key Characteristics
Reactive
Steps react to signals arriving on the stream instead of running top-to-bottom.
Long-lived
A stream workflow stays alive until every subscribed step reaches a terminal state.
Durable Suspension
The parent
@workflow is released via hook() while steps wait for signals — no worker is pinned.Scoped
Each
stream_run_id isolates its own set of subscriptions, signals, and results.When to Use
Use a stream workflow when your process is event-driven and multi-actor — e.g. a planner step waiting for task results from N worker steps, a supervisor loop waiting on human-in-the-loop feedback, or long-running agents that react to external signals over hours or days. Use a regular@workflow when your process is a linear sequence of steps that run to completion.
| Feature | @workflow | @stream_workflow |
|---|---|---|
| Execution model | Top-to-bottom | Signal-driven, reactive |
| Step activation | Explicit await step() | Subscription to signal types |
| Termination | Return from function | All steps reach terminal state |
| Suspension | sleep / hook | hook + signal arrival |
| Best for | Sequential business logic | Agents, multi-actor coordination, HITL loops |
Defining a Stream Workflow
The stream name defaults to the function name when
name= is omitted. Every @stream_step with matching stream="..." subscribes to this channel.Running from a Parent Workflow
Stream workflows are driven from inside a regular@workflow via run_stream_workflow(). This is what gives them durability: the parent workflow records the stream run, suspends via hook() between signals, and the dispatcher resumes the parent once the stream reaches a terminal aggregate state.
Signature
Aggregate Lifecycle
A stream run is in one of three aggregate states derived from its subscription rows:| Aggregate | Condition | Parent workflow behavior |
|---|---|---|
running | At least one step is waiting or running | Parent is suspended via hook(), worker released |
completed | Every step is terminated | run_stream_workflow returns StreamWorkflowResult(status="completed") |
suspended | At least one step suspended, none waiting/running | Parent re-raises SuspensionSignal (bubbles up HITL) |
StreamWorkflowResult
await set_result(...) in its lifecycle body is attached to the subscription row and surfaced here — the parent workflow does not need to read checkpoints.
Backend Support
Stream workflows are fully supported on:- PostgreSQL — the primary target (migrations v5/v6/v7 add the required columns)
- Citus — inherits from Postgres, with reference-table distribution
- InMemory — for unit tests
stream_run_id. Running concurrent stream runs on those backends will cross-contaminate subscription state — stick to Postgres/Citus for production streams.
Scheduled Signals
Useschedule_signal() to emit a signal after a delay. A Celery beat task (pyworkflow.streams.drain_scheduled_signals, runs every 2s) polls the scheduled_signals table and emits due rows.
Related Concepts
- Stream Steps — the reactive units that subscribe to streams
- Hooks — stream workflows use
hook()internally for parent suspension - Workflows — stream workflows are driven from inside regular
@workflows