Skip to main content

What is a Stream Step?

A stream step is a long-lived reactive unit that subscribes to a named stream and runs in response to signals. Unlike a regular @step — which runs once when called from a workflow — a @stream_step is materialized as a subscription row when its parent stream workflow starts, and is then re-invoked each time a matching signal arrives, until it explicitly terminates.
from pyworkflow import stream_step
from pyworkflow.streams import get_current_signal, set_result, terminate

@stream_step(stream="agent_comms", signals=["task.assigned"])
async def worker():
    signal = await get_current_signal()
    if signal is None:
        # First start — no signal yet, just register and wait.
        return

    task_id = signal.payload["task_id"]
    result = await do_work(task_id)

    await set_result({"task_id": task_id, "result": result})
    await terminate()

Key Characteristics

Signal-driven

The step body runs once on start and again on each matching signal arrival.

Stateful

Use save_checkpoint() / get_checkpoint() to carry state across resumes without rebuilding from scratch.

Suspendable

suspend() parks the step and bubbles a SuspensionSignal up to the parent workflow — perfect for HITL.

Schema-aware

Bind Pydantic models to signal types and the dispatcher validates payloads before delivery.

The Two Code Paths

A stream step has two code paths the runtime invokes at different times:
  1. on_signal callback — runs on every matching signal arrival. It receives the raw Signal and a StreamStepContext. Its job is to decide whether to resume(), cancel(), terminate(), or suspend() — typically very lightweight.
  2. The lifecycle function (the decorated function itself) — runs on first start and again on each explicit ctx.resume(). This is where the actual work happens.
If you don’t pass an on_signal, the default callback (_auto_resume_on_signal) just calls ctx.resume() for every matching signal — which is the right thing 90% of the time.
from pyworkflow import stream_step
from pyworkflow.streams import get_current_signal, terminate

# Custom on_signal: only resume on the high-priority variant.
async def gate_priority(signal, ctx):
    if signal.payload.get("priority") == "high":
        await ctx.resume()
    # Otherwise: do nothing — the signal is acked and the step stays waiting.

@stream_step(
    stream="agent_comms",
    signals=["task.created"],
    on_signal=gate_priority,
)
async def priority_worker():
    signal = await get_current_signal()
    if signal is None:
        return
    await handle_high_priority(signal.payload)
    await terminate()

Defining Signals

The signals= argument accepts either a plain list of names or a dict mapping names to Pydantic schemas.
@stream_step(
    stream="agent_comms",
    signals=["task.created", "task.updated", "task.cancelled"],
)
async def planner():
    signal = await get_current_signal()
    ...
When a schema is bound to a signal type, payloads that fail validation are rejected by the dispatcher and the step is not invoked for that signal.

Lifecycle Primitives

These helpers, imported from pyworkflow.streams, are how the lifecycle function communicates with the dispatcher. They all set state that the dispatcher reads after the function returns — they don’t immediately mutate the subscription row.
PrimitivePurpose
await get_current_signal()Returns the Signal that triggered this resume (or None on first start).
await set_result(value)Attach a result payload to the subscription. Surfaced via StreamWorkflowResult.get_result(...).
await terminate()Mark the step terminated — it will not be invoked again.
await suspend(reason, resume_signals=None)Mark the step suspended — bubbles a SuspensionSignal up to the parent workflow (HITL).
await save_checkpoint(data)Persist a JSON-serializable dict for the next resume.
await get_checkpoint()Load the most recently saved checkpoint dict (or None).
from pyworkflow.streams import (
    get_current_signal,
    get_checkpoint,
    save_checkpoint,
    set_result,
    suspend,
    terminate,
)

@stream_step(stream="reviews", signals=["review.submitted", "human.approved"])
async def review_loop():
    signal = await get_current_signal()
    state = (await get_checkpoint()) or {"submissions": 0}

    if signal is None:
        return  # initial start

    if signal.signal_type == "review.submitted":
        state["submissions"] += 1
        await save_checkpoint(state)

        if state["submissions"] >= 3:
            # Park the step until a human reviews it.
            await suspend(
                reason="hitl:needs_review",
                resume_signals=["human.approved"],
            )
            return

    if signal.signal_type == "human.approved":
        await set_result({"approved": True, "submissions": state["submissions"]})
        await terminate()

The Signal Object

get_current_signal() returns a Signal dataclass:
@dataclass
class Signal:
    signal_id: str
    stream_id: str
    signal_type: str
    payload: Any                      # dict (validated against schema if configured)
    published_at: datetime
    sequence: int | None              # Per-stream ordering, assigned by storage
    source_run_id: str | None         # run_id of the workflow that called emit()
    stream_run_id: str | None         # Scoping key for this stream run
    metadata: dict[str, Any]
The sequence field gives you a strict ordering across all signals on the stream — useful for deduping or detecting gaps. source_run_id is set automatically when emit() is called from inside a workflow context.

Emitting Signals from a Step

Stream steps can emit signals back to the same or another stream — this is how multi-actor pipelines fan out work between participants.
from pyworkflow.streams import emit, get_current_signal, terminate

@stream_step(stream="orders", signals=["order.placed"])
async def fulfillment():
    signal = await get_current_signal()
    if signal is None:
        return

    order_id = signal.payload["order_id"]
    await ship(order_id)

    # Notify the billing stream that this order is ready to invoice.
    await emit("billing", "order.shipped", {"order_id": order_id})

    await terminate()
emit() automatically picks up the current stream_run_id from context, so signals stay scoped to the same run unless you pass an explicit override.

The StreamStepContext (for on_signal callbacks)

When you pass a custom on_signal=..., the callback receives (signal, ctx) where ctx is a StreamStepContext:
MethodEffect
await ctx.resume()Trigger the lifecycle function with signal as the current signal.
await ctx.terminate()Mark the step terminated without invoking the lifecycle function.
await ctx.suspend(reason, resume_signals=None)Park the step; bubbles to the parent workflow as a SuspensionSignal.
await ctx.cancel(reason=None)Cancel the step (terminal, with a reason).
ctx also exposes ctx.status, ctx.run_id, and ctx.stream_id for inspection.
async def cancel_on_kill(signal, ctx):
    if signal.signal_type == "stream.kill":
        await ctx.cancel(reason="received kill signal")
        return
    await ctx.resume()

@stream_step(
    stream="agent_comms",
    signals=["task.created", "stream.kill"],
    on_signal=cancel_on_kill,
)
async def killable_worker():
    ...

Terminal States

Every stream step subscription is in one of these states. The aggregate of all step states determines whether the parent stream workflow is running, completed, or suspended — see Stream Workflows: Aggregate Lifecycle.
StateMeaning
waitingSubscribed and parked, ready for the next matching signal.
runningLifecycle function is currently executing on a worker.
suspendedsuspend() was called — bubbles SuspensionSignal up to the parent.
terminatedterminate() was called — the step is done and will never run again.
cancelledcancel() was called from an on_signal callback.

Best Practices

  • Keep on_signal callbacks lightweight. They run for every signal, including ones you ignore — do filtering there, do work in the lifecycle function.
  • Always handle the signal is None case. The first invocation has no triggering signal; that’s the registration phase. Use it to seed state, not to do work.
  • Use set_result() instead of save_checkpoint() for output the parent needs. save_checkpoint is for internal state across resumes; set_result is the parent-facing return value.
  • Call terminate() or suspend() when you’re done. A step that just returns without setting a terminal state stays in waiting and keeps the parent stream workflow alive.
  • Don’t rely on globals. Step lifecycles can run on different workers between resumes — persist anything you need via save_checkpoint().
  • Stream Workflows — the parent runtime that hosts and drives stream steps
  • Steps — the regular, run-once step model for sequential workflows
  • Hooks — the suspension primitive that stream workflows use under the hood