What is a Stream Step?
A stream step is a long-lived reactive unit that subscribes to a named stream and runs in response to signals. Unlike a regular@step — which runs once when called from a workflow — a @stream_step is materialized as a subscription row when its parent stream workflow starts, and is then re-invoked each time a matching signal arrives, until it explicitly terminates.
Key Characteristics
Signal-driven
The step body runs once on start and again on each matching signal arrival.
Stateful
Use
save_checkpoint() / get_checkpoint() to carry state across resumes without rebuilding from scratch.Suspendable
suspend() parks the step and bubbles a SuspensionSignal up to the parent workflow — perfect for HITL.Schema-aware
Bind Pydantic models to signal types and the dispatcher validates payloads before delivery.
The Two Code Paths
A stream step has two code paths the runtime invokes at different times:on_signalcallback — runs on every matching signal arrival. It receives the rawSignaland aStreamStepContext. Its job is to decide whether toresume(),cancel(),terminate(), orsuspend()— typically very lightweight.- The lifecycle function (the decorated function itself) — runs on first start and again on each explicit
ctx.resume(). This is where the actual work happens.
on_signal, the default callback (_auto_resume_on_signal) just calls ctx.resume() for every matching signal — which is the right thing 90% of the time.
Defining Signals
Thesignals= argument accepts either a plain list of names or a dict mapping names to Pydantic schemas.
- List of names
- Pydantic schemas
When a schema is bound to a signal type, payloads that fail validation are rejected by the dispatcher and the step is not invoked for that signal.
Lifecycle Primitives
These helpers, imported frompyworkflow.streams, are how the lifecycle function communicates with the dispatcher. They all set state that the dispatcher reads after the function returns — they don’t immediately mutate the subscription row.
| Primitive | Purpose |
|---|---|
await get_current_signal() | Returns the Signal that triggered this resume (or None on first start). |
await set_result(value) | Attach a result payload to the subscription. Surfaced via StreamWorkflowResult.get_result(...). |
await terminate() | Mark the step terminated — it will not be invoked again. |
await suspend(reason, resume_signals=None) | Mark the step suspended — bubbles a SuspensionSignal up to the parent workflow (HITL). |
await save_checkpoint(data) | Persist a JSON-serializable dict for the next resume. |
await get_checkpoint() | Load the most recently saved checkpoint dict (or None). |
The Signal Object
get_current_signal() returns a Signal dataclass:
sequence field gives you a strict ordering across all signals on the stream — useful for deduping or detecting gaps. source_run_id is set automatically when emit() is called from inside a workflow context.
Emitting Signals from a Step
Stream steps can emit signals back to the same or another stream — this is how multi-actor pipelines fan out work between participants.emit() automatically picks up the current stream_run_id from context, so signals stay scoped to the same run unless you pass an explicit override.
The StreamStepContext (for on_signal callbacks)
When you pass a custom on_signal=..., the callback receives (signal, ctx) where ctx is a StreamStepContext:
| Method | Effect |
|---|---|
await ctx.resume() | Trigger the lifecycle function with signal as the current signal. |
await ctx.terminate() | Mark the step terminated without invoking the lifecycle function. |
await ctx.suspend(reason, resume_signals=None) | Park the step; bubbles to the parent workflow as a SuspensionSignal. |
await ctx.cancel(reason=None) | Cancel the step (terminal, with a reason). |
ctx also exposes ctx.status, ctx.run_id, and ctx.stream_id for inspection.
Terminal States
Every stream step subscription is in one of these states. The aggregate of all step states determines whether the parent stream workflow isrunning, completed, or suspended — see Stream Workflows: Aggregate Lifecycle.
| State | Meaning |
|---|---|
waiting | Subscribed and parked, ready for the next matching signal. |
running | Lifecycle function is currently executing on a worker. |
suspended | suspend() was called — bubbles SuspensionSignal up to the parent. |
terminated | terminate() was called — the step is done and will never run again. |
cancelled | cancel() was called from an on_signal callback. |
Best Practices
- Keep
on_signalcallbacks lightweight. They run for every signal, including ones you ignore — do filtering there, do work in the lifecycle function. - Always handle the
signal is Nonecase. The first invocation has no triggering signal; that’s the registration phase. Use it to seed state, not to do work. - Use
set_result()instead ofsave_checkpoint()for output the parent needs.save_checkpointis for internal state across resumes;set_resultis the parent-facing return value. - Call
terminate()orsuspend()when you’re done. A step that justreturns without setting a terminal state stays inwaitingand keeps the parent stream workflow alive. - Don’t rely on globals. Step lifecycles can run on different workers between resumes — persist anything you need via
save_checkpoint().
Related Concepts
- Stream Workflows — the parent runtime that hosts and drives stream steps
- Steps — the regular, run-once step model for sequential workflows
- Hooks — the suspension primitive that stream workflows use under the hood