Skip to content

Sample code for Nexus messaging#290

Merged
Evanthx merged 28 commits intomainfrom
signals-nexus-python
May 4, 2026
Merged

Sample code for Nexus messaging#290
Evanthx merged 28 commits intomainfrom
signals-nexus-python

Conversation

@Evanthx
Copy link
Copy Markdown
Contributor

@Evanthx Evanthx commented Apr 15, 2026

This is sample code to show two ways to send messages (signals, queries, and updates) through Nexus.

@Evanthx Evanthx requested a review from a team as a code owner April 15, 2026 18:31
@Evanthx Evanthx force-pushed the signals-nexus-python branch from d11d1a0 to 83386cf Compare April 15, 2026 22:57
Copy link
Copy Markdown
Contributor

@VegetarianOrc VegetarianOrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks okay to me! A few comments throughout.

Left a couple of comments to help me understand the motivation for the structure of the samples as well.

Would like to clear up the intention around removing the sync operation sample before approving.

Comment thread nexus_messaging/callerpattern/caller/workflows.py Outdated
Comment thread nexus_sync_operations/caller/workflows.py Outdated
Comment thread nexus_messaging/callerpattern/handler/workflows.py Outdated
Comment thread nexus_messaging/callerpattern/README.md Outdated
Comment thread nexus_messaging/callerpattern/README.md Outdated
Comment on lines +12 to +16
The caller workflow:
1. Queries for supported languages (`get_languages` -- backed by a `@workflow.query`)
2. Changes the language to Arabic (`set_language` -- backed by a `@workflow.update` that calls an activity)
3. Confirms the change via a second query (`get_language`)
4. Approves the workflow (`approve` -- backed by a `@workflow.signal`)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The caller workflow only executes nexus operations and it feels a bit confusing to call out that they're backed by workflow.query/update/signal. IMO that info should be called out about or alongside the service handler.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is demonstrating queries, updates and signals this does actually feel useful to me though

Comment thread nexus_messaging/callerpattern/handler/worker.py
Comment thread nexus_messaging/callerpattern/handler/service_handler.py
Comment thread nexus_messaging/ondemandpattern/README.md
@Evanthx Evanthx requested a review from VegetarianOrc April 21, 2026 16:51
Comment thread nexus_messaging/ondemandpattern/handler/workflows.py Outdated
Evanthx and others added 6 commits April 23, 2026 16:14
* Nexus samples: use business IDs for workflow IDs.

Stop using the Nexus request ID (nexus_cancel) or bare uuid4 (hello_nexus,
nexus_multiple_args) as the backing workflow ID. Build a meaningful business
ID from the operation input and append a uuid4 suffix for uniqueness.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* remove UUID from the nexus sample workflow ids to better communicate intent

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jmaeagle99 and others added 9 commits May 1, 2026 11:34
* Add workflow_streams samples: order_workflow scenario

Initial samples directory for temporalio.contrib.workflow_streams,
the workflow-hosted durable event stream contrib (experimental,
contrib/pubsub branch of sdk-python).

The order_workflow scenario covers the basic publisher path: a
workflow binds a typed topic in @workflow.init, an activity
publishes events via the topic handle, and a starter subscribes
with WorkflowStreamClient and prints events as they arrive.

Also enables the uv supply-chain cooldown options in the lockfile.

* samples: workflow_stream: add reconnecting-subscriber scenario

Adds a second scenario demonstrating the central Workflow Streams use
case: a consumer disconnects mid-stream and resumes later via
subscribe(from_offset=...), with no events lost or duplicated. The
existing OrderWorkflow finishes too quickly to make the pattern
visible, so this introduces a multi-stage PipelineWorkflow paced with
workflow.sleep between stages.

The runner reads a couple of events, persists item.offset + 1 to a
temp file, sleeps "disconnected" while the workflow keeps publishing,
then opens a fresh Client + WorkflowStreamClient and resumes from the
persisted offset — the same shape that works across actual process
restarts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* samples: workflow_stream: add external-publisher scenario

Adds a third scenario covering the third publisher shape: a backend
service or scheduled job pushing events into a workflow it didn't
itself start. The earlier scenarios publish either from inside the
workflow or from one of its activities; this one uses
WorkflowStreamClient.create() externally.

HubWorkflow is a passive stream host — it does no work of its own and
just waits to be told to close, fitting the event-bus pattern. The
runner publishes a series of news headlines, runs a subscriber task
alongside, signals close, and exits when both tasks complete.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* samples: workflow_stream: add truncating-ticker scenario

Adds a fourth scenario for long-running workflows that need to bound
their event log: the workflow publishes events at a fixed cadence and
calls self.stream.truncate(...) periodically to keep only the most
recent entries.

The runner subscribes twice — fast and slow — to make the trade
visible: the fast subscriber sees every offset in order; the slow one
falls behind a truncation, has its iterator transparently jump forward
to the new base offset, and shows the offset gap that intermediate
events fell into. This is the model for high-volume long-running
streams: bounded log size, slow consumers may miss intermediate events
but always see the most recent state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* samples: rename workflow_stream → workflow_streams; migrate to topic handles

- Directory and module path renamed to plural to match sdk-python
  `temporalio.contrib.workflow_streams` rename.
- Workflow-side: bind a typed topic handle in `@workflow.init` and call
  `topic.publish(value)` — the removed `WorkflowStream.publish` form is
  gone. Same change applied to the activity and external-publisher.
- Activity: `WorkflowStreamClient.from_activity()` →
  `from_within_activity()`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* samples: workflow_streams review polish

- README: fix scenario count (two -> four), document subscriber start
  position and continue-as-new semantics for stream_state
- hub_workflow: drop stale comment referencing a README race note
  that does not exist in this sample
- payment_activity: trim long publisher_id/dedup caveat — moved out
  of the first sample's docstring to keep it approachable

* workflow_streams: deliver terminal events + fix run_publisher subscribe shape

End-to-end runs of the four workflow_streams scenarios surfaced two
sample-side issues, both fixed here.

run_publisher's consumer asserted ``isinstance(item.data, Payload)`` and
called ``payload_converter.from_payload(item.data, T)``. The contrib's
``subscribe()`` defaults to converter-decoded data, not raw payloads,
so this assertion fired on the first run. Switch to
``result_type=RawValue`` (the documented escape hatch for heterogeneous
topics) and read ``item.data.payload``.

Items published in the same workflow task that returns from
``@workflow.run`` were not delivered to subscribers — the in-memory
log dies with the workflow and the next subscriber poll lands on a
completed workflow. Fix: each scenario now uses an in-band terminator
that subscribers break on, and each workflow holds the run open with
``await workflow.sleep(timedelta(milliseconds=500))`` so that final
publish is fetched before the workflow exits:

- OrderWorkflow / PipelineWorkflow: the workflow's own
  ``StatusEvent(kind="complete")`` / ``StageEvent(stage="complete")``
  is the terminator (consumers already broke on it).
- HubWorkflow: the *publisher* in run_external_publisher emits a
  sentinel ``NewsEvent(headline="__done__")`` immediately before
  signaling close; the consumer breaks on the sentinel.
- TickerWorkflow: the final tick (n == count - 1) is the terminator;
  ``keep_last`` guarantees that offset survives the last truncation,
  so even slow consumers reach it.

Because subscribers stop polling on the terminator, by the time
``workflow.run`` returns there are no in-flight poll handlers — no
``UnfinishedUpdateHandlersWarning`` from the SDK and no need for
``detach_pollers()`` / ``wait_condition(all_handlers_finished)`` in
the workflow exit path.

Two consecutive end-to-end runs of all four scenarios pass cleanly
against ``temporal server start-dev --headless``.

* workflow_streams README: document the stream-end pattern

Subscribers don't exit on their own when the host workflow completes —
they need an in-band terminator, and the workflow needs to hold open
briefly so the final publish is fetched before run() returns. Both
pieces show up in every scenario here, so document them in one place
and update scenario 3's description to mention the sentinel headline
the publisher emits.

* samples: workflow_streams: README and wheel packages cleanup

Now that temporalio 1.27.0 has shipped (and main has bumped to it in
#302), drop the README's "install sdk-python from a branch" callout
and point at >=1.27.0 instead. Also add workflow_streams to the wheel
packages list alongside the other samples.

* samples: workflow_streams: drop force_flush=True from charge_card

The activity's final publish was using force_flush=True, which sets the
flush_event so the background flusher fires immediately. Triggering a
flush right before __aexit__ runs the activity into the
WorkflowStreamClient's cancel-mid-flush path: __aexit__ cancels the
flusher task while it's awaiting the publish signal RPC, the cancel
propagates into the in-flight signal, and the activity hangs until the
StartToClose timeout fires. Empirically the workflow then retries the
activity indefinitely.

Without force_flush=True the buffered "card charged" event flushes via
the regular 200ms batch interval and the flusher is sleeping in
wait_for(...) when __aexit__ cancels it — a clean cancellation path.
The user-visible publish ordering is unchanged.

The underlying SDK bug should be fixed separately by switching __aexit__
from cancel() to a cooperative-stop flag so the in-flight signal
completes before the flusher exits.

* samples: workflow_streams: drop temp-file resume offset; add stats column

The reconnecting-subscriber demo previously persisted its resume offset
to a temp file between phases. Inside one process that's theatrical:
the disconnect/reconnect shape comes from creating a fresh Client +
WorkflowStreamClient with from_offset=N, not from where N happens to be
stored. Replace the file with a local int and a comment about durable
storage in production (a DB row keyed by user_id/run_id, etc.).

Restructure output around a stats column so the demo conveys what's
happening to the stream at all times, not just between phases. A
background poller calls WorkflowStreamClient.get_offset() throughout
and emits a heartbeat line once a second; every emit prints current
proc/avail/pend in a left column followed by the phase or event
message. Watching pend grow during the disconnect window and shrink
again as phase 2 catches up is the demo's core point.

* samples: workflow_streams: surface multiple truncation jumps in ticker

The truncating-ticker demo is meant to make the bounded-log trade
visible: fast subscriber sees every event, slow subscriber loses
intermediate ones to truncation. The previous parameters
(truncate_every=5, keep_last=3, interval_ms=400, slow_delay=1.5s)
produced at most one tiny jump near the end of the run — easy to miss.

Tighter parameters (truncate_every=2, keep_last=1, interval_ms=200,
count=30) keep the workflow log at one or two entries between
truncations. That shrinks the slow subscriber's per-poll batch, so it
re-polls more often, and most polls land after a truncation that has
passed its position. The result is several visible jumps over the
demo, not a single batched one at the end.

Switch the output to two lanes (fast on the left, slow on the right
with explicit "↪ jumped offset=N → M (K dropped)" markers) so the
divergence reads at a glance instead of being lost in interleaved
single-stream output. Also extend the docstring to call out the
opposite trade — never truncating means slow consumers eventually
catch up at the cost of unbounded workflow history — so readers know
when this pattern is the wrong fit.

* samples: workflow_streams: add LLM-streaming scenario

Adds a fifth scenario to workflow_streams/ that streams an OpenAI
chat completion to the terminal through a Workflow Stream. Activity
is the publisher (it owns the non-deterministic API call), workflow
hosts the stream and runs the activity, runner subscribes and
renders to stdout as deltas arrive.

Layout:

* `chat_shared.py` — types and topics for this scenario, kept out of
  the cross-scenario `shared.py` because no other scenario uses them
* `workflows/chat_workflow.py` — `ChatWorkflow` runs `stream_completion`
  with `RetryPolicy(maximum_attempts=3)` and the same 500ms hold-open
  pattern the other four samples use
* `activities/chat_activity.py` — `stream_completion` calls
  `AsyncOpenAI(...).chat.completions.create(stream=True)` with
  `gpt-5-mini`, publishes each token chunk on the `delta` topic, the
  full text on `complete`, and a `RetryEvent` on `retry` when running
  on attempt > 1. `force_flush=True` is intentionally omitted to
  avoid the `__aexit__` cancel-mid-flight hang in
  `temporalio.contrib.workflow_streams` 1.27.0; the 200ms
  `batch_interval` is fast enough for an interactive feel.
* `run_chat.py` — subscribes to all three topics, prints deltas to
  stdout as they stream, and on a retry event uses plain ANSI
  escapes (`\033[<n>A`, `\033[J`) to rewind the rendered output
  before the retried attempt re-publishes
* `run_chat_worker.py` — runs on its own task queue
  (`workflow-stream-chat-task-queue`), registering only
  `ChatWorkflow` and `stream_completion`; the openai dependency and
  the `OPENAI_API_KEY` requirement stay isolated to this one
  scenario

The split worker also makes the retry-handling demo trivial to run:
the user kills the chat worker mid-stream, brings it back up, and
the activity retries — no synthetic failure injection needed.

Adds `chat-stream = ["openai>=1.0,<2"]` as a new optional
dependency group; `uv sync --group chat-stream` and an
`OPENAI_API_KEY` are documented in the README.

* samples: workflow_streams: drop chat-stream openai upper cap

openai-agents (the existing langsmith-tracing / openai-agents extras)
already pulls openai>=2.26.0. Capping chat-stream at openai<2 made
the two extras unsatisfiable together. Drop the cap; the chat
activity uses APIs that are stable across openai 1.x and 2.x.

* samples: workflow_streams: chat consumer header + cursor save/restore

Two display fixes for run_chat.py:

1. Print a header line right after start_workflow so the user sees
   immediate feedback ("[chat <id>] streaming response from gpt-5-mini,
   awaiting first token...") instead of a blank screen until the first
   delta arrives.

2. Replace the newline-counting ANSI clear with cursor save/restore
   (\033[s / \033[u\033[J). The previous version counted text newlines
   to decide how far up to move the cursor on retry, which undercounts
   when the terminal has wrapped long lines — the failed attempt's
   first wrapped lines stayed on screen above the retry marker.
   save/restore rewinds to a fixed position regardless of wrapping.

Bumps the prompt to a 500-word distributed-systems comparison
(Paxos vs Raft vs Viewstamped Replication) so there is enough output
to comfortably kill the worker mid-stream and watch the retried
attempt re-render from scratch.

* samples: workflow_streams: rename chat -> llm in scenario 5

"Chat" implies multi-turn conversation. The new scenario is a
one-shot LLM completion stream, not a chat. Rename to make the
scope clear:

- chat_shared.py             -> llm_shared.py
- workflows/chat_workflow.py -> workflows/llm_workflow.py
- activities/chat_activity.py -> activities/llm_activity.py
- run_chat.py                -> run_llm.py
- run_chat_worker.py         -> run_llm_worker.py
- ChatInput / ChatWorkflow   -> LLMInput / LLMWorkflow
- CHAT_TASK_QUEUE            -> LLM_TASK_QUEUE
  ("workflow-stream-chat-task-queue" -> "workflow-stream-llm-task-queue")
- chat-stream extra          -> llm-stream
- workflow id prefix
  workflow-stream-chat-...   -> workflow-stream-llm-...

The activity's `stream_completion` defn name and the topic
constants (`delta`, `complete`, `retry`) stay the same — those
already describe what they do without the "chat" framing.
README, docstrings, and run instructions updated to match.

* samples: workflow_streams: race the LLM consumer with workflow result

If the LLM activity exhausts its retries (bad OPENAI_API_KEY,
provider outage, etc.), the workflow fails before the activity
publishes the `complete` terminator. The consumer's previous
async-for loop only exited on `complete`, so the script blocked
indefinitely on a terminator that would never arrive instead of
surfacing the workflow failure.

Wrap the subscriber in a `consume()` coroutine and run it through
the existing `race_with_workflow` helper (the same pattern
`run_publisher.py` uses): if the workflow finishes first the
subscriber gets cancelled and the workflow's exception propagates;
if the subscriber sees `complete` first, the helper waits for the
workflow result and returns it.

Found in a Codex code review of today's workflow_streams changes.

* samples: workflow_streams: drop race_with_workflow helper

The helper wrapped the consumer in an asyncio.gather that cancelled
the subscriber when the workflow result settled — defensive logic
for a case the SDK already handles. WorkflowStreamClient.subscribe()
exits cleanly on every workflow terminal state (return,
continue-as-new, failure) via its AcceptedUpdateCompletedWorkflow,
WorkflowUpdateRPCTimeoutOrCancelledError, and NOT_FOUND branches in
sdk-python. The async-for loop ends naturally when the workflow
terminates without a publish, so we don't need a separate task to
race against handle.result().

Replace the helper with the obvious shape in both runners:

    async for item in stream.subscribe(...):
        ...
        if item.is_terminator:
            break

    result = await handle.result()  # raises on workflow failure

Either path reaches handle.result(): an explicit break on the
in-band terminator (workflow still running, hold-open lets the
poll deliver the event), or the iterator naturally exhausting when
the workflow has already terminated. handle.result() then either
returns or raises the workflow's failure — covering the LLM
"activity exhausted retries" case that prompted the helper to be
added in the first place.

Smoke tested:

    uv run workflow_streams/run_publisher.py
    uv run workflow_streams/run_llm.py

* samples: workflow_streams: reorganize README; drop closing section

Two fixes:

1. Reorganize so the README doesn't jump back and forth between
   scenarios. The previous shape introduced 1-4, then put scenario
   5's full description plus its setup and run instructions inline,
   then jumped back to a "Run it" section that only covered 1-4.
   New shape: all five scenarios up front (parallel structure), one
   unified "Run it" section that covers worker setup for both
   groups and all five runner scripts in one block, then expected
   output, then notes.

2. Drop the inline "Ending the stream" section. The same material
   is in documentation/docs/develop/python/libraries/workflow-streams.mdx
   under the "Closing the stream" anchor, so the README links there
   from the Notes block instead of duplicating the explanation.

The scenario 5 "split-out worker" rationale (extra dependency,
secret, retry-via-Ctrl-C) collapses to a single sentence at the end
of its bullet block.

* samples: workflow_streams: drop README Notes section

The Notes block (subscriber start position, continue-as-new,
closing the stream) was a small docs summary tacked onto the end of
the README. The samples themselves cover these points: docstrings
in each runner / workflow / activity explain the from_offset
behavior, the stream_state field, and the in-band terminator +
hold-open pattern. Readers who want the full conceptual treatment
go to the docs page; the README sticks to "what the scenarios are
and how to run them".

* samples: workflow_streams: lock llm-stream dependency group

The llm-stream dependency group was introduced in pyproject.toml
without a corresponding uv.lock update, so `uv sync --frozen --group
llm-stream` would fail or force a relock before scenario 5 could
run. Add the two missing entries (the package-optional-dependencies
list and the package-metadata requires-dev list) so frozen installs
work against the committed lock.

Found in a Codex review of the day's workflow_streams changes.

* samples: workflow_streams: fix lint failures (ruff isort + format)

CI's `poe lint` step was failing on three small things across four
files:

* `run_external_publisher.py`, `ticker_workflow.py`: ruff isort
  (`I001`) wanted the `workflow_streams.shared` imports re-sorted
  and a stray blank line removed. Apply the auto-fix.
* `run_external_publisher.py`, `run_reconnecting_subscriber.py`,
  `run_truncating_ticker.py`: ruff format wanted three line-wrapped
  function calls collapsed back to single lines. Apply the
  formatter.
* `run_truncating_ticker.py`: the formatter joined an adjacent
  pair of f-strings into an awkward `f"..." f"..."` one-liner.
  Consolidate them into a single f-string for readability — the
  resulting line is comfortably under the 88-char limit.

`poe lint` (ruff isort + ruff format --check + mypy
--all-groups --check-untyped-defs) now passes locally.

* samples: workflow_streams: drop BFF jargon and Expected output block

Two README/comment cleanups:

* "BFF" (backend-for-frontend) is not a widely-known term outside
  certain front-end-architecture circles. Replace with the more
  obvious "web backends" in the README intro and "production web
  backend" in the run_reconnecting_subscriber.py comment about
  where the resume offset would live durably.

* Drop the "Expected output" section. It only covered scenarios 1
  and 2; with five scenarios it is no longer pulling its weight.
  Anyone running the script can see the output for themselves.

* Apply suggestion from @brianstrauch

* Apply suggestion from @brianstrauch

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Brian Strauch <brian@brianstrauch.com>
@Evanthx Evanthx merged commit 6b8e441 into main May 4, 2026
11 checks passed
@Evanthx Evanthx deleted the signals-nexus-python branch May 4, 2026 18:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants