Skip to content

refactor(kio)!: delete Consumer::write/produce, add shared Sender/Receiver#2074

Open
kixelated wants to merge 1 commit into
devfrom
claude/wonderful-matsumoto-68c93e
Open

refactor(kio)!: delete Consumer::write/produce, add shared Sender/Receiver#2074
kixelated wants to merge 1 commit into
devfrom
claude/wonderful-matsumoto-68c93e

Conversation

@kixelated

@kixelated kixelated commented Jul 3, 2026

Copy link
Copy Markdown
Collaborator

Summary

kio::Consumer documents itself as read-only but exposed write() and produce() â^@^T mutable back-doors that also re-exposed the Mut::deref_mut-marks-modified wake footgun (the one behind the wasm consume freeze) to external users. This deletes both, and closes the same hole in Weak.

Weak: drop the write path

kio::Weak was the same hole one level down: a single type carrying produce, write, the producer-side used/unused, and the consumer-side consume/read — all reachable from Consumer::weak(). So consumer.weak().produce() (or .write()) bypassed the deletion entirely.

Rename it to ProducerWeak (only a producer mints one), drop write (upgrade via produce() to write), and remove the unused Consumer::weak(). A consumer can no longer mint a weak at all, so there is no path to write access, direct or through a weak.

New: kio::shared

A two-sided shared-state channel. A Sender and Receiver both mutate one lock-guarded value by design (unlike the one-writer Producer/Consumer watch channel), with channel-style liveness: a wait resolves to None once the opposite side's handles are all gone, rather than blocking forever like a bare condvar. That's exactly the distinction a reverse queue needs (drain ends when senders vanish; enqueue reports when no handler exists).

Ported the three reverse-queue sites in moq-net

  • origin request_broadcast â^@^T the dynamic request queue.
  • broadcast BroadcastConsumer::track â^@^T needed a two-channel split: a Producer<()>/Consumer<()> liveness channel (read-only for consumers, drives closed()) plus a kio::shared registry+queue, so the handler's atomic drain-and-register-track survives.
  • track (fetch + subscriptions) â^@^T kept the forward TrackState watch channel (the streaming hot path) untouched; moved the reverse bits into kio::shared channels stored inside TrackState so nothing threads through the handle structs. Fetch carries a per-request result channel for rejection only (success routes through the cache), which deletes next_fetch, fetch_rejections, clear_group_request_rejection, and the request id. Subscriptions register via a pending_subs queue the producer drains into its persistent list.

Each site also deletes a manual dynamic: usize handler counter and its Clone/Drop bookkeeping â^@^T that's now the Receiver count.

Public API (kio, breaking â^F^R dev)

  • Removed: Consumer::write, Consumer::produce, Consumer::weak, and Weak (renamed to ProducerWeak, its write dropped).
  • Added: shared::{Sender, Receiver}; ProducerWeak (replacing Weak); Producer::weak() returns it.
  • moq-net's public surface is unchanged; only internal handle types and private state changed (TrackWeak/TrackDemand now hold ProducerWeak).

Behavior note

A request_broadcast for an unannounced path with no live handler now resolves Unroutable instead of Dropped (no test covered Dropped, and it matches the already-existing pending path, which maps channel-close to Unroutable).

Test plan

  • kio: 12 tests (incl. new shared unit tests)
  • moq-net: 410 lib + 4 doctests â^@^T all fetch/coalesce/accept-after-handler-dropped/subscription contract tests unchanged and green
  • downstream hang (18), moq-relay (351), moq-mux (132) green
  • cargo clippy clean; cargo check --workspace --all-targets clean

Follow-up considered, not done

kio's two channel families are now a bit inconsistent (Producer::write/poll vs Sender::lock/Receiver::poll_lock_when; the shared gate-poll uses &T+bool where the watch one uses &Ref+Poll<()>). Worth a naming/shape pass, but held out of this PR to keep it reviewable.

ð^_¤^V Generated with Claude Code (Written by Claude Opus 4.8)

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry @kixelated, you have reached your weekly rate limit of 500000 diff characters.

Please try again later or upgrade to continue using Sourcery

@kixelated kixelated force-pushed the claude/wonderful-matsumoto-68c93e branch 3 times, most recently from 3c62431 to 5ea4488 Compare July 4, 2026 00:28
…h, add shared channel

`kio::Consumer` documented itself read-only but exposed `write()` and
`produce()`, mutable back-doors that also re-exposed the
`Mut::deref_mut`-marks-modified wake footgun to external users. Delete both.

`kio::Weak` was the same hole one level down: a single type with `produce`,
`write`, and the producer-side `used`/`unused` *and* the consumer-side
`consume`/`read`, reachable from `Consumer::weak()`. So `consumer.weak().produce()`
bypassed the deletion. Rename it to `ProducerWeak` (only a producer mints one),
drop `write` (upgrade via `produce()` instead), and remove the unused
`Consumer::weak()`. A reader now has no path to write access, direct or weak.

Add `kio::shared`: a two-sided shared-state channel where a `Sender` and
`Receiver` both mutate one lock-guarded value by design, with channel-style
liveness (a wait resolves to `None` once the opposite side's handles are all
gone, rather than blocking forever like a bare condvar). This is the right
primitive for a reverse request queue, where the old code reached back through
a read handle.

Port the three reverse-queue sites in moq-net onto it:

- origin `request_broadcast`: the dynamic request queue.
- broadcast `Consumer::track`: split into a `Producer<()>`/`Consumer<()>`
  liveness channel (read-only for consumers) plus a `kio::shared` registry+queue,
  so the handler's atomic drain-and-register-track is preserved.
- track fetch + subscriptions: reverse channels stored inside `TrackState` so no
  handle threading. Fetch carries a per-request result channel for rejection only
  (success routes through the cache), deleting `next_fetch`/`fetch_rejections`/the
  request id. Subscriptions register via a `pending_subs` queue the producer
  drains into its persistent list.

Each site also deletes a manual `dynamic: usize` handler counter and its
Clone/Drop bookkeeping (replaced by the `Receiver` count). moq-net's public API
is unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kixelated kixelated force-pushed the claude/wonderful-matsumoto-68c93e branch from 5ea4488 to 4055d3c Compare July 4, 2026 00:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant