diff --git a/Cargo.lock b/Cargo.lock index b19aa76dc..e5eef0019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3933,6 +3933,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minicov" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4869b6a491569605d66d3952bcdf03df789e5b536e5f0cf7758a7f08a55ae24d" +dependencies = [ + "cc", + "walkdir", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4248,6 +4258,7 @@ version = "0.1.14" dependencies = [ "bytes", "futures", + "getrandom 0.4.3", "kio", "num_enum", "rand 0.10.2", @@ -4256,6 +4267,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "wasm-bindgen-test", "web-async", "web-transport-trait", ] @@ -8544,6 +8556,45 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-bindgen-test" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a0d555ca874445df8d314f94f5c948a4e74e5418f332c89f660a3d8310a96f4" +dependencies = [ + "async-trait", + "cast", + "js-sys", + "libm", + "minicov", + "nu-ansi-term", + "num-traits", + "oorandom", + "serde", + "serde_json", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test-macro", + "wasm-bindgen-test-shared", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94eb68555b95bcea5e8cf4abe280b529049479fa995bfc23734af96a6aedc120" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.118", +] + +[[package]] +name = "wasm-bindgen-test-shared" +version = "0.2.126" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31d56021e873866c968588ed85ccdf56db5c426e44afdb4618c39895104b920" + [[package]] name = "wasm-streams" version = "0.5.0" @@ -8567,15 +8618,14 @@ dependencies = [ "js-sys", "parking_lot", "pin-utils", - "slab", "wasm-bindgen", ] [[package]] name = "web-async" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f56ac33e792583916a8021e43e8a7e0987f5df7abc8f8afd72fcc361048755" +checksum = "1ba1426dcf56093a94d9415ba4c016e45c1323679501e7309f1919ab32f08206" dependencies = [ "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index d8f6508da..b51971219 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,10 +75,9 @@ moq-srt = { version = "0.1.0", path = "rs/moq-srt" } moq-token = { version = "0.6", path = "rs/moq-token" } moq-video = { version = "0.0.6", path = "rs/moq-video" } qmux = { version = "0.2", default-features = false } - serde = { version = "1", features = ["derive"] } tokio = "1.48" -web-async = { version = "0.1.3", features = ["tracing"] } +web-async = { version = "0.1.5", features = ["tracing"] } web-transport-iroh = "0.6" web-transport-noq = "0.2.0" web-transport-proto = "0.6" diff --git a/rs/moq-net/Cargo.toml b/rs/moq-net/Cargo.toml index 0b1ae664b..e73bd2c86 100644 --- a/rs/moq-net/Cargo.toml +++ b/rs/moq-net/Cargo.toml @@ -12,6 +12,9 @@ rust-version.workspace = true keywords = ["quic", "http3", "webtransport", "media", "live"] categories = ["multimedia", "network-programming", "web-programming"] +[package.metadata.cargo-shear] +ignored = ["getrandom"] + [features] # Legacy no-op: serde is now unconditional (stats publishing requires it). serde = [] @@ -29,3 +32,10 @@ tokio = { workspace = true, features = ["macros", "io-util", "sync", "test-util" tracing = "0.1" web-async = { workspace = true } web-transport-trait = { workspace = true } + +# wasm model-layer tests (tests/wasm.rs). getrandom's wasm backend is enabled +# only here (dev), so it isn't forced on downstream consumers. They select +# their own backend in the leaf binary. +[target.'cfg(target_family = "wasm")'.dev-dependencies] +getrandom = { version = "0.4", features = ["wasm_js"] } +wasm-bindgen-test = "0.3" diff --git a/rs/moq-net/src/ietf/control.rs b/rs/moq-net/src/ietf/control.rs index af6593aef..db27c97ba 100644 --- a/rs/moq-net/src/ietf/control.rs +++ b/rs/moq-net/src/ietf/control.rs @@ -35,7 +35,7 @@ impl Control { /// Allocate the next request_id, blocking until MAX_REQUEST_ID allows it. pub async fn next_request_id(&self) -> Result { - let timeout = tokio::time::sleep(std::time::Duration::from_secs(10)); + let timeout = web_async::time::sleep(std::time::Duration::from_secs(10)); tokio::pin!(timeout); loop { diff --git a/rs/moq-net/src/lite/publisher.rs b/rs/moq-net/src/lite/publisher.rs index 882d80d8c..5aa668d40 100644 --- a/rs/moq-net/src/lite/publisher.rs +++ b/rs/moq-net/src/lite/publisher.rs @@ -110,8 +110,8 @@ impl Publisher { const PROBE_MAX_AGE: Duration = Duration::from_secs(10); const PROBE_MAX_DELTA: f64 = 0.25; - let mut last_sent: Option<(u64, tokio::time::Instant)> = None; - let mut interval = tokio::time::interval(PROBE_INTERVAL); + let mut last_sent: Option<(u64, web_async::time::Instant)> = None; + let mut interval = web_async::time::interval(PROBE_INTERVAL); loop { tokio::select! { @@ -139,7 +139,7 @@ impl Publisher { if should_send { let rtt = session.stats().rtt().map(|d| d.as_millis() as u64); stream.writer.encode(&lite::Probe { bitrate, rtt }).await?; - last_sent = Some((bitrate, tokio::time::Instant::now())); + last_sent = Some((bitrate, web_async::time::Instant::now())); } } } diff --git a/rs/moq-net/src/model/time.rs b/rs/moq-net/src/model/time.rs index 53eae274b..5dc163927 100644 --- a/rs/moq-net/src/model/time.rs +++ b/rs/moq-net/src/model/time.rs @@ -4,7 +4,6 @@ use crate::Error; use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt}; use std::sync::LazyLock; -use std::time::{SystemTime, UNIX_EPOCH}; /// A timestamp representing the presentation time in milliseconds. /// @@ -186,11 +185,9 @@ impl Timescale { self.0.into_inner() == 0 } - /// Current time as a timestamp, derived from [`tokio::time::Instant::now`] so - /// it honors `tokio::time::pause` in tests. + /// Current time as a timestamp. pub fn now() -> Self { - // We use tokio so it can be stubbed for testing. - tokio::time::Instant::now().into() + web_async::time::Instant::now().into() } /// Convert this timestamp to a different scale. @@ -289,17 +286,20 @@ impl std::ops::SubAssign for Timescale { } // There's no zero Instant, so we need to use a reference point. -static TIME_ANCHOR: LazyLock<(std::time::Instant, SystemTime)> = LazyLock::new(|| { +static TIME_ANCHOR: LazyLock<(web_async::time::Instant, web_async::time::SystemTime)> = LazyLock::new(|| { // To deter nerds trying to use timestamp as wall clock time, we subtract a random amount of time from the anchor. // This will make our timestamps appear to be late; just enough to be annoying and obscure our clock drift. // This will also catch bad implementations that assume unrelated broadcasts are synchronized. let jitter = std::time::Duration::from_millis(rand::rng().random_range(0..69_420)); - (std::time::Instant::now(), SystemTime::now() - jitter) + ( + web_async::time::Instant::now(), + web_async::time::SystemTime::now() - jitter, + ) }); -// Convert an Instant to a Unix timestamp -impl From for Timescale { - fn from(instant: std::time::Instant) -> Self { +// Convert an Instant to a Unix timestamp. +impl From for Timescale { + fn from(instant: web_async::time::Instant) -> Self { let (anchor_instant, anchor_system) = *TIME_ANCHOR; // Conver the instant to a SystemTime. @@ -311,19 +311,13 @@ impl From for Timescale { // Convert the SystemTime to a Unix timestamp in nanoseconds. // We'll then convert that to the desired scale. system - .duration_since(UNIX_EPOCH) + .duration_since(web_async::time::UNIX_EPOCH) .expect("dude your clock is earlier than 1970") .try_into() .expect("dude your clock is later than 2116") } } -impl From for Timescale { - fn from(instant: tokio::time::Instant) -> Self { - instant.into_std().into() - } -} - impl Decode for Timescale { fn decode(r: &mut R, version: crate::Version) -> Result { let v = VarInt::decode(r, version)?; diff --git a/rs/moq-net/src/model/track.rs b/rs/moq-net/src/model/track.rs index 7df0fdab6..a3b23ff96 100644 --- a/rs/moq-net/src/model/track.rs +++ b/rs/moq-net/src/model/track.rs @@ -60,7 +60,7 @@ impl Track { #[derive(Default)] struct State { /// Groups in arrival order. `None` entries are tombstones for evicted groups. - groups: VecDeque>, + groups: VecDeque>, duplicates: HashSet, offset: usize, max_sequence: Option, @@ -174,7 +174,7 @@ impl State { /// non-max_sequence group (everything after it arrived even later). /// When max_sequence is at the front, we skip past it and tombstone expired groups /// behind it. - fn evict_expired(&mut self, now: tokio::time::Instant) { + fn evict_expired(&mut self, now: web_async::time::Instant) { for slot in self.groups.iter_mut() { let Some((group, created_at)) = slot else { continue }; @@ -246,7 +246,7 @@ impl TrackProducer { return Err(Error::Duplicate); } - let now = tokio::time::Instant::now(); + let now = web_async::time::Instant::now(); state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.sequence)); state.groups.push_back(Some((group.clone(), now))); state.evict_expired(now); @@ -269,7 +269,7 @@ impl TrackProducer { let group = Group { sequence }.produce(); - let now = tokio::time::Instant::now(); + let now = web_async::time::Instant::now(); state.duplicates.insert(sequence); state.max_sequence = Some(sequence); state.groups.push_back(Some((group.clone(), now))); diff --git a/rs/moq-net/src/session.rs b/rs/moq-net/src/session.rs index 744325c16..3ab30f81a 100644 --- a/rs/moq-net/src/session.rs +++ b/rs/moq-net/src/session.rs @@ -1,5 +1,6 @@ -use std::{future::Future, pin::Pin, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; +use web_async::MaybeSendBoxFuture; use web_transport_trait::Stats; use crate::{BandwidthConsumer, BandwidthProducer, Error, Version}; @@ -113,7 +114,7 @@ async fn run_send_bandwidth_inner(session: &S, return; } - let mut interval = tokio::time::interval(POLL_INTERVAL); + let mut interval = web_async::time::interval(POLL_INTERVAL); loop { tokio::select! { biased; @@ -136,9 +137,9 @@ async fn run_send_bandwidth_inner(session: &S, } // We use a wrapper type that is dyn-compatible to remove the generic bounds from Session. -trait SessionInner: Send + Sync { +trait SessionInner: web_transport_trait::MaybeSend + web_transport_trait::MaybeSync { fn close(&self, code: u32, reason: &str); - fn closed(&self) -> Pin + Send + '_>>; + fn closed(&self) -> MaybeSendBoxFuture<'_, String>; } impl SessionInner for S { @@ -146,7 +147,7 @@ impl SessionInner for S { S::close(self, code, reason); } - fn closed(&self) -> Pin + Send + '_>> { + fn closed(&self) -> MaybeSendBoxFuture<'_, String> { Box::pin(async move { S::closed(self).await.to_string() }) } } diff --git a/rs/moq-net/src/stats.rs b/rs/moq-net/src/stats.rs index 8f980df36..6aa7d12f1 100644 --- a/rs/moq-net/src/stats.rs +++ b/rs/moq-net/src/stats.rs @@ -1235,8 +1235,8 @@ async fn run_publisher( drop(shared); } - let mut ticker = tokio::time::interval(interval); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let mut ticker = web_async::time::interval(interval); + ticker.set_missed_tick_behavior(web_async::time::MissedTickBehavior::Delay); loop { ticker.tick().await; diff --git a/rs/moq-net/tests/wasm.rs b/rs/moq-net/tests/wasm.rs new file mode 100644 index 000000000..f59e49793 --- /dev/null +++ b/rs/moq-net/tests/wasm.rs @@ -0,0 +1,62 @@ +//! wasm32 model-layer tests. +//! +//! moq-net's model layer (Origin/Broadcast/Track/Group/Frame) is transport- +//! independent, so it can be exercised in-process on wasm without a +//! WebTransport session. This covers both directions (produce + consume) plus +//! the wasm timestamp clock that the producer path depends on. +//! +//! Run (bypassing `wasm-pack test`, which builds the crate's native-only lib +//! unit tests too. They use `tokio::spawn` and don't compile on wasm): +//! +//! ```sh +//! CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_RUNNER=wasm-bindgen-test-runner \ +//! RUSTFLAGS='--cfg=web_sys_unstable_apis --cfg=getrandom_backend="wasm_js"' \ +//! cargo test --test wasm -p moq-net --target wasm32-unknown-unknown +//! ``` +//! +//! Runs under Node (default). `performance.now()` / `Date.now()` back the +//! clock there just as in a browser; these model-layer tests need no +//! WebTransport. Add `wasm_bindgen_test_configure!(run_in_browser)` to run under +//! headless Chrome (the subscriber's real environment) once chromedriver is set. +#![cfg(target_arch = "wasm32")] + +use bytes::Bytes; +use moq_net::{Broadcast, Time, Track}; +use wasm_bindgen_test::*; + +/// The producer timestamp clock works on wasm: `Time::now()` (which flows through +/// the web_async clock) returns a sane, non-decreasing wall-clock time. +/// On the old code this panicked (`std::time` has no clock on wasm32). +#[wasm_bindgen_test] +fn timescale_now_is_sane_and_monotonic() { + let a = Time::now(); + let b = Time::now(); + + // A real post-2020 wall-clock time in ms (2020-01-01 = 1_577_836_800_000). + // Proves the web_async wall clock actually resolved. + assert!( + a.as_millis() > 1_577_836_800_000, + "timestamp before 2020. wall clock not working: {}", + a.as_millis() + ); + // Monotonic non-decreasing. + assert!(b >= a, "time went backwards: {} < {}", b.as_millis(), a.as_millis()); +} + +/// Bidirectional model round-trip in-process on wasm: produce a track + frame, +/// then consume it back. Exercises the produce path (which stamps groups via the +/// wasm `web_async::time` clock) and the consume path together. +#[wasm_bindgen_test] +async fn produce_consume_frame_roundtrip() { + let mut broadcast = Broadcast::new().produce(); + let mut track = broadcast.create_track(Track::new("stream")).unwrap(); + let consumer = broadcast.consume(); + let mut sub = consumer.subscribe_track(&Track::new("stream")).unwrap(); + + // Producer side: write a frame (creates a group, timestamped via the wasm clock). + track.write_frame(Bytes::from_static(b"hello-wasm")).unwrap(); + + // Consumer side: read it back. + let frame = sub.read_frame().await.unwrap(); + assert_eq!(frame.as_deref(), Some(&b"hello-wasm"[..]), "frame did not round-trip"); +}