Skip to main content

What is a Stream Workflow?

A stream workflow is a long-lived, reactive unit of orchestration built on top of the signal pub/sub system. Where a @workflow runs top-to-bottom and completes, a @stream_workflow defines a named channel (“stream”) that one or more @stream_steps subscribe to. Each step reacts to incoming signals, can suspend and be resumed by new signals, and can publish results back.
from pyworkflow import stream_workflow, stream_step, emit, run_stream_workflow, workflow

@stream_workflow(name="agent_comms")
async def agent_comms():
    """Defines the 'agent_comms' channel. Body runs once on start."""
    await emit("agent_comms", "agent.started", {"ts": "now"})

@stream_step(stream="agent_comms", signals=["task.assigned"])
async def worker():
    from pyworkflow.streams import get_current_signal, set_result, terminate
    signal = await get_current_signal()
    if signal is None:
        return  # initial start, no signal yet
    await set_result({"task_id": signal.payload["task_id"]})
    await terminate()

@workflow()
async def run_agent(run_id: str):
    # Drive the stream workflow from inside a durable @workflow.
    result = await run_stream_workflow(agent_comms, stream_run_id=run_id)
    return result.step_results

Key Characteristics

Reactive

Steps react to signals arriving on the stream instead of running top-to-bottom.

Long-lived

A stream workflow stays alive until every subscribed step reaches a terminal state.

Durable Suspension

The parent @workflow is released via hook() while steps wait for signals — no worker is pinned.

Scoped

Each stream_run_id isolates its own set of subscriptions, signals, and results.

When to Use

Use a stream workflow when your process is event-driven and multi-actor — e.g. a planner step waiting for task results from N worker steps, a supervisor loop waiting on human-in-the-loop feedback, or long-running agents that react to external signals over hours or days. Use a regular @workflow when your process is a linear sequence of steps that run to completion.
Feature@workflow@stream_workflow
Execution modelTop-to-bottomSignal-driven, reactive
Step activationExplicit await step()Subscription to signal types
TerminationReturn from functionAll steps reach terminal state
Suspensionsleep / hookhook + signal arrival
Best forSequential business logicAgents, multi-actor coordination, HITL loops

Defining a Stream Workflow

from pyworkflow import stream_workflow, emit

@stream_workflow(name="order_pipeline")
async def order_pipeline():
    """
    The body runs once at startup, after all @stream_step subscriptions
    for this stream have been materialized. Use it to emit the initial
    signal that kicks the pipeline off.
    """
    await emit("order_pipeline", "pipeline.started", {})
The stream name defaults to the function name when name= is omitted. Every @stream_step with matching stream="..." subscribes to this channel.

Running from a Parent Workflow

Stream workflows are driven from inside a regular @workflow via run_stream_workflow(). This is what gives them durability: the parent workflow records the stream run, suspends via hook() between signals, and the dispatcher resumes the parent once the stream reaches a terminal aggregate state.
from pyworkflow import workflow, run_stream_workflow
from pyworkflow.streams import StreamWorkflowResult

@workflow()
async def agent_run(job_id: str):
    async def init():
        # Optional: seed initial state before the stream body runs
        await seed_job(job_id)

    result: StreamWorkflowResult = await run_stream_workflow(
        order_pipeline,
        stream_run_id=f"stream_{job_id}",
        init=init,
    )

    # Read step outputs published via set_result()
    planner_output = result.get_result("planner")
    return {"job_id": job_id, "planner": planner_output}

Signature

await run_stream_workflow(
    stream_workflow_func,     # the @stream_workflow-decorated function
    *,
    stream_run_id: str,       # unique scope key for this run
    init: Callable | None = None,   # optional async init callback
    storage = None,           # defaults to context / global config
) -> StreamWorkflowResult

Aggregate Lifecycle

A stream run is in one of three aggregate states derived from its subscription rows:
AggregateConditionParent workflow behavior
runningAt least one step is waiting or runningParent is suspended via hook(), worker released
completedEvery step is terminatedrun_stream_workflow returns StreamWorkflowResult(status="completed")
suspendedAt least one step suspended, none waiting/runningParent re-raises SuspensionSignal (bubbles up HITL)
run_stream_workflow(...)


┌────────────────────────┐
│ Ensure subscriptions   │
│ exist for every step   │
└───────┬────────────────┘


┌────────────────────────┐
│ Run init() + body()    │  (first run only)
└───────┬────────────────┘


┌────────────────────────┐
│ Compute aggregate      │
└───────┬────────────────┘

        ├─ "completed" ──────► Return StreamWorkflowResult

        ├─ "suspended" ──────► Raise SuspensionSignal (HITL)

        └─ "running"


        ┌────────────────────────┐
        │ await hook(...)        │  Parent worker released
        │ Record parent link     │
        └───────┬────────────────┘
                │  ... signals flow, steps execute ...


        ┌──────────────────────────────┐
        │ Dispatcher sees terminal agg │
        │ Calls resume_hook(token)     │
        └───────┬──────────────────────┘


        Parent re-enters run_stream_workflow,
        recomputes aggregate, returns or re-raises.

StreamWorkflowResult

@dataclass
class StreamWorkflowResult:
    status: str                              # "completed"
    step_states: dict[str, str]              # step_run_id -> final status
    step_results: dict[str, Any]             # step_run_id -> set_result(...) payload

    def get_result(self, step_name: str) -> Any:
        """Look up a result by step name (matches against step_run_id)."""
Any value a step published via await set_result(...) in its lifecycle body is attached to the subscription row and surfaced here — the parent workflow does not need to read checkpoints.

Backend Support

Stream workflows are fully supported on:
  • PostgreSQL — the primary target (migrations v5/v6/v7 add the required columns)
  • Citus — inherits from Postgres, with reference-table distribution
  • InMemory — for unit tests
Other SQL backends (SQLite, MySQL) and NoSQL backends (DynamoDB, Cassandra) accept the stream API for signature compatibility but do not scope subscriptions by stream_run_id. Running concurrent stream runs on those backends will cross-contaminate subscription state — stick to Postgres/Citus for production streams.

Scheduled Signals

Use schedule_signal() to emit a signal after a delay. A Celery beat task (pyworkflow.streams.drain_scheduled_signals, runs every 2s) polls the scheduled_signals table and emits due rows.
from pyworkflow.streams import schedule_signal

await schedule_signal(
    stream_id="agent_comms",
    signal_type="supervisor.wakeup",
    payload={"reason": "periodic_check"},
    delay_seconds=300,  # 5 minutes
    stream_run_id="stream_job_42",
)
This replaces in-process polling loops inside stream workflows — the scheduler is durable across worker restarts.
  • Stream Steps — the reactive units that subscribe to streams
  • Hooks — stream workflows use hook() internally for parent suspension
  • Workflows — stream workflows are driven from inside regular @workflows