Skip to content
Merged
56 changes: 53 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,9 @@ moq-srt = { version = "0.1.0", path = "rs/moq-srt" }
moq-token = { version = "0.6", path = "rs/moq-token" }
moq-video = { version = "0.0.6", path = "rs/moq-video" }
qmux = { version = "0.2", default-features = false }

serde = { version = "1", features = ["derive"] }
tokio = "1.48"
web-async = { version = "0.1.3", features = ["tracing"] }
web-async = { version = "0.1.5", features = ["tracing"] }
web-transport-iroh = "0.6"
web-transport-noq = "0.2.0"
web-transport-proto = "0.6"
Expand Down
10 changes: 10 additions & 0 deletions rs/moq-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ rust-version.workspace = true
keywords = ["quic", "http3", "webtransport", "media", "live"]
categories = ["multimedia", "network-programming", "web-programming"]

[package.metadata.cargo-shear]
ignored = ["getrandom"]

[features]
# Legacy no-op: serde is now unconditional (stats publishing requires it).
serde = []
Expand All @@ -29,3 +32,10 @@ tokio = { workspace = true, features = ["macros", "io-util", "sync", "test-util"
tracing = "0.1"
web-async = { workspace = true }
web-transport-trait = { workspace = true }

# wasm model-layer tests (tests/wasm.rs). getrandom's wasm backend is enabled
# only here (dev), so it isn't forced on downstream consumers. They select
# their own backend in the leaf binary.
[target.'cfg(target_family = "wasm")'.dev-dependencies]
getrandom = { version = "0.4", features = ["wasm_js"] }
wasm-bindgen-test = "0.3"
2 changes: 1 addition & 1 deletion rs/moq-net/src/ietf/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Control {

/// Allocate the next request_id, blocking until MAX_REQUEST_ID allows it.
pub async fn next_request_id(&self) -> Result<RequestId, Error> {
let timeout = tokio::time::sleep(std::time::Duration::from_secs(10));
let timeout = web_async::time::sleep(std::time::Duration::from_secs(10));
tokio::pin!(timeout);

loop {
Expand Down
6 changes: 3 additions & 3 deletions rs/moq-net/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ impl<S: web_transport_trait::Session> Publisher<S> {
const PROBE_MAX_AGE: Duration = Duration::from_secs(10);
const PROBE_MAX_DELTA: f64 = 0.25;

let mut last_sent: Option<(u64, tokio::time::Instant)> = None;
let mut interval = tokio::time::interval(PROBE_INTERVAL);
let mut last_sent: Option<(u64, web_async::time::Instant)> = None;
let mut interval = web_async::time::interval(PROBE_INTERVAL);

loop {
tokio::select! {
Expand Down Expand Up @@ -139,7 +139,7 @@ impl<S: web_transport_trait::Session> Publisher<S> {
if should_send {
let rtt = session.stats().rtt().map(|d| d.as_millis() as u64);
stream.writer.encode(&lite::Probe { bitrate, rtt }).await?;
last_sent = Some((bitrate, tokio::time::Instant::now()));
last_sent = Some((bitrate, web_async::time::Instant::now()));
}
}
}
Expand Down
28 changes: 11 additions & 17 deletions rs/moq-net/src/model/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::Error;
use crate::coding::{Decode, DecodeError, Encode, EncodeError, VarInt};

use std::sync::LazyLock;
use std::time::{SystemTime, UNIX_EPOCH};

/// A timestamp representing the presentation time in milliseconds.
///
Expand Down Expand Up @@ -186,11 +185,9 @@ impl<const SCALE: u64> Timescale<SCALE> {
self.0.into_inner() == 0
}

/// Current time as a timestamp, derived from [`tokio::time::Instant::now`] so
/// it honors `tokio::time::pause` in tests.
/// Current time as a timestamp.
pub fn now() -> Self {
// We use tokio so it can be stubbed for testing.
tokio::time::Instant::now().into()
web_async::time::Instant::now().into()
}

/// Convert this timestamp to a different scale.
Expand Down Expand Up @@ -289,17 +286,20 @@ impl<const SCALE: u64> std::ops::SubAssign for Timescale<SCALE> {
}

// There's no zero Instant, so we need to use a reference point.
static TIME_ANCHOR: LazyLock<(std::time::Instant, SystemTime)> = LazyLock::new(|| {
static TIME_ANCHOR: LazyLock<(web_async::time::Instant, web_async::time::SystemTime)> = LazyLock::new(|| {
// To deter nerds trying to use timestamp as wall clock time, we subtract a random amount of time from the anchor.
// This will make our timestamps appear to be late; just enough to be annoying and obscure our clock drift.
// This will also catch bad implementations that assume unrelated broadcasts are synchronized.
let jitter = std::time::Duration::from_millis(rand::rng().random_range(0..69_420));
(std::time::Instant::now(), SystemTime::now() - jitter)
(
web_async::time::Instant::now(),
web_async::time::SystemTime::now() - jitter,
)
});

// Convert an Instant to a Unix timestamp
impl<const SCALE: u64> From<std::time::Instant> for Timescale<SCALE> {
fn from(instant: std::time::Instant) -> Self {
// Convert an Instant to a Unix timestamp.
impl<const SCALE: u64> From<web_async::time::Instant> for Timescale<SCALE> {
fn from(instant: web_async::time::Instant) -> Self {
let (anchor_instant, anchor_system) = *TIME_ANCHOR;

// Conver the instant to a SystemTime.
Expand All @@ -311,19 +311,13 @@ impl<const SCALE: u64> From<std::time::Instant> for Timescale<SCALE> {
// Convert the SystemTime to a Unix timestamp in nanoseconds.
// We'll then convert that to the desired scale.
system
.duration_since(UNIX_EPOCH)
.duration_since(web_async::time::UNIX_EPOCH)
.expect("dude your clock is earlier than 1970")
.try_into()
.expect("dude your clock is later than 2116")
}
}

impl<const SCALE: u64> From<tokio::time::Instant> for Timescale<SCALE> {
fn from(instant: tokio::time::Instant) -> Self {
instant.into_std().into()
}
}

impl<const SCALE: u64> Decode<crate::Version> for Timescale<SCALE> {
fn decode<R: bytes::Buf>(r: &mut R, version: crate::Version) -> Result<Self, DecodeError> {
let v = VarInt::decode(r, version)?;
Expand Down
8 changes: 4 additions & 4 deletions rs/moq-net/src/model/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Track {
#[derive(Default)]
struct State {
/// Groups in arrival order. `None` entries are tombstones for evicted groups.
groups: VecDeque<Option<(GroupProducer, tokio::time::Instant)>>,
groups: VecDeque<Option<(GroupProducer, web_async::time::Instant)>>,
duplicates: HashSet<u64>,
offset: usize,
max_sequence: Option<u64>,
Expand Down Expand Up @@ -174,7 +174,7 @@ impl State {
/// non-max_sequence group (everything after it arrived even later).
/// When max_sequence is at the front, we skip past it and tombstone expired groups
/// behind it.
fn evict_expired(&mut self, now: tokio::time::Instant) {
fn evict_expired(&mut self, now: web_async::time::Instant) {
for slot in self.groups.iter_mut() {
let Some((group, created_at)) = slot else { continue };

Expand Down Expand Up @@ -246,7 +246,7 @@ impl TrackProducer {
return Err(Error::Duplicate);
}

let now = tokio::time::Instant::now();
let now = web_async::time::Instant::now();
state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.sequence));
state.groups.push_back(Some((group.clone(), now)));
state.evict_expired(now);
Expand All @@ -269,7 +269,7 @@ impl TrackProducer {

let group = Group { sequence }.produce();

let now = tokio::time::Instant::now();
let now = web_async::time::Instant::now();
state.duplicates.insert(sequence);
state.max_sequence = Some(sequence);
state.groups.push_back(Some((group.clone(), now)));
Expand Down
11 changes: 6 additions & 5 deletions rs/moq-net/src/session.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use web_async::MaybeSendBoxFuture;
use web_transport_trait::Stats;

use crate::{BandwidthConsumer, BandwidthProducer, Error, Version};
Expand Down Expand Up @@ -113,7 +114,7 @@ async fn run_send_bandwidth_inner<S: web_transport_trait::Session>(session: &S,
return;
}

let mut interval = tokio::time::interval(POLL_INTERVAL);
let mut interval = web_async::time::interval(POLL_INTERVAL);
loop {
tokio::select! {
biased;
Expand All @@ -136,17 +137,17 @@ async fn run_send_bandwidth_inner<S: web_transport_trait::Session>(session: &S,
}

// We use a wrapper type that is dyn-compatible to remove the generic bounds from Session.
trait SessionInner: Send + Sync {
trait SessionInner: web_transport_trait::MaybeSend + web_transport_trait::MaybeSync {
fn close(&self, code: u32, reason: &str);
fn closed(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>>;
fn closed(&self) -> MaybeSendBoxFuture<'_, String>;
}

impl<S: web_transport_trait::Session> SessionInner for S {
fn close(&self, code: u32, reason: &str) {
S::close(self, code, reason);
}

fn closed(&self) -> Pin<Box<dyn Future<Output = String> + Send + '_>> {
fn closed(&self) -> MaybeSendBoxFuture<'_, String> {
Box::pin(async move { S::closed(self).await.to_string() })
}
}
4 changes: 2 additions & 2 deletions rs/moq-net/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1235,8 +1235,8 @@ async fn run_publisher(
drop(shared);
}

let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut ticker = web_async::time::interval(interval);
ticker.set_missed_tick_behavior(web_async::time::MissedTickBehavior::Delay);

loop {
ticker.tick().await;
Expand Down
62 changes: 62 additions & 0 deletions rs/moq-net/tests/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! wasm32 model-layer tests.
//!
//! moq-net's model layer (Origin/Broadcast/Track/Group/Frame) is transport-
//! independent, so it can be exercised in-process on wasm without a
//! WebTransport session. This covers both directions (produce + consume) plus
//! the wasm timestamp clock that the producer path depends on.
//!
//! Run (bypassing `wasm-pack test`, which builds the crate's native-only lib
//! unit tests too. They use `tokio::spawn` and don't compile on wasm):
//!
//! ```sh
//! CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_RUNNER=wasm-bindgen-test-runner \
//! RUSTFLAGS='--cfg=web_sys_unstable_apis --cfg=getrandom_backend="wasm_js"' \
//! cargo test --test wasm -p moq-net --target wasm32-unknown-unknown
//! ```
//!
//! Runs under Node (default). `performance.now()` / `Date.now()` back the
//! clock there just as in a browser; these model-layer tests need no
//! WebTransport. Add `wasm_bindgen_test_configure!(run_in_browser)` to run under
//! headless Chrome (the subscriber's real environment) once chromedriver is set.
#![cfg(target_arch = "wasm32")]

use bytes::Bytes;
use moq_net::{Broadcast, Time, Track};
use wasm_bindgen_test::*;

/// The producer timestamp clock works on wasm: `Time::now()` (which flows through
/// the web_async clock) returns a sane, non-decreasing wall-clock time.
/// On the old code this panicked (`std::time` has no clock on wasm32).
#[wasm_bindgen_test]
fn timescale_now_is_sane_and_monotonic() {
let a = Time::now();
let b = Time::now();

// A real post-2020 wall-clock time in ms (2020-01-01 = 1_577_836_800_000).
// Proves the web_async wall clock actually resolved.
assert!(
a.as_millis() > 1_577_836_800_000,
"timestamp before 2020. wall clock not working: {}",
a.as_millis()
);
// Monotonic non-decreasing.
assert!(b >= a, "time went backwards: {} < {}", b.as_millis(), a.as_millis());
}

/// Bidirectional model round-trip in-process on wasm: produce a track + frame,
/// then consume it back. Exercises the produce path (which stamps groups via the
/// wasm `web_async::time` clock) and the consume path together.
#[wasm_bindgen_test]
async fn produce_consume_frame_roundtrip() {
let mut broadcast = Broadcast::new().produce();
let mut track = broadcast.create_track(Track::new("stream")).unwrap();
let consumer = broadcast.consume();
let mut sub = consumer.subscribe_track(&Track::new("stream")).unwrap();

// Producer side: write a frame (creates a group, timestamped via the wasm clock).
track.write_frame(Bytes::from_static(b"hello-wasm")).unwrap();

// Consumer side: read it back.
let frame = sub.read_frame().await.unwrap();
assert_eq!(frame.as_deref(), Some(&b"hello-wasm"[..]), "frame did not round-trip");
}
Loading