Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions js/hang/src/catalog/audio.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as z from "zod/mini";
import { ContainerSchema } from "./container";
import { u53Schema } from "./integers";
import { RelativeBroadcastSchema } from "./path";

// Backwards compatibility: old track schema
const TrackSchema = z.object({
Expand All @@ -10,6 +11,11 @@ const TrackSchema = z.object({
// Mirrors AudioDecoderConfig
// https://w3c.github.io/webcodecs/#audio-decoder-config
export const AudioConfigSchema = z.object({
// Optional reference to another broadcast that publishes this track, expressed
// relative to the broadcast that served this catalog (e.g. "../source").
// If unset, the track lives in the same broadcast as the catalog.
broadcast: z.optional(RelativeBroadcastSchema),

// See: https://w3c.github.io/webcodecs/codec_registry.html
codec: z.string(),

Expand Down
1 change: 1 addition & 0 deletions js/hang/src/catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from "./container";
export * from "./format";
export * from "./integers";
export * from "./location";
export * from "./path";
export * from "./preview";
export * from "./priority";
export * from "./root";
Expand Down
60 changes: 60 additions & 0 deletions js/hang/src/catalog/path.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { expect, test } from "bun:test";
import { Path } from "@moq/net";
import { normalizeRelativeBroadcast, resolveBroadcast } from "./path.ts";

test("resolveBroadcast appends named segments", () => {
const base = Path.from("a/b");
expect(resolveBroadcast(base, "c")).toBe(Path.from("a/b/c"));
expect(resolveBroadcast(base, "c/d")).toBe(Path.from("a/b/c/d"));
});

test("resolveBroadcast with empty rel returns base", () => {
expect(resolveBroadcast(Path.from("a/b"), "")).toBe(Path.from("a/b"));
});

test("resolveBroadcast single dotdot pops one segment", () => {
const base = Path.from("a/b/c");
expect(resolveBroadcast(base, "../d")).toBe(Path.from("a/b/d"));
expect(resolveBroadcast(base, "..")).toBe(Path.from("a/b"));
});

test("resolveBroadcast multiple dotdot pops multiple segments", () => {
const base = Path.from("a/b/c");
expect(resolveBroadcast(base, "../../x")).toBe(Path.from("a/x"));
expect(resolveBroadcast(base, "../../../x")).toBe(Path.from("x"));
});

test("resolveBroadcast excess dotdot clamps at empty", () => {
const base = Path.from("a");
expect(resolveBroadcast(base, "../../../foo")).toBe(Path.from("foo"));
expect(resolveBroadcast(base, "..")).toBe(Path.from(""));
});

test("resolveBroadcast with empty base", () => {
const base = Path.from("");
expect(resolveBroadcast(base, "foo")).toBe(Path.from("foo"));
expect(resolveBroadcast(base, "..")).toBe(Path.from(""));
});

test("resolveBroadcast treats dot as a no-op", () => {
const base = Path.from("a/b");
expect(resolveBroadcast(base, ".")).toBe(Path.from("a/b"));
expect(resolveBroadcast(base, "./c")).toBe(Path.from("a/b/c"));
expect(resolveBroadcast(base, "./../c")).toBe(Path.from("a/c"));
expect(resolveBroadcast(base, "foo/./bar")).toBe(Path.from("a/b/foo/bar"));
});

test("resolveBroadcast self-reference via dotdot equals base", () => {
const base = Path.from("a/b");
expect(resolveBroadcast(base, "../b")).toBe(base);
});

test("normalizeRelativeBroadcast drops empty and dot segments", () => {
expect(normalizeRelativeBroadcast("")).toBe("");
expect(normalizeRelativeBroadcast(".")).toBe("");
expect(normalizeRelativeBroadcast("./foo")).toBe("foo");
expect(normalizeRelativeBroadcast("foo//bar")).toBe("foo/bar");
expect(normalizeRelativeBroadcast("foo/./bar")).toBe("foo/bar");
expect(normalizeRelativeBroadcast("/foo/")).toBe("foo");
expect(normalizeRelativeBroadcast("../foo")).toBe("../foo");
});
58 changes: 58 additions & 0 deletions js/hang/src/catalog/path.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Path } from "@moq/net";
import * as z from "zod/mini";

/**
* Normalize a relative broadcast string the way Rust `PathRelative::new` does: trim
* leading/trailing slashes, drop empty segments, and drop `.` segments. `..` is preserved
* and only interpreted by `resolveBroadcast`.
*
* Returns the normalized form. Two callers comparing normalized strings can detect that
* `""`, `"."`, `"/./"` etc. all mean "no override".
*/
export function normalizeRelativeBroadcast(rel: string): string {
return rel
.split("/")
.filter((s) => s !== "" && s !== ".")
.join("/");
}

/**
* Resolve a relative broadcast reference against the path of the broadcast that served the catalog.
*
* `..` segments pop the last segment of the base path; other segments are appended.
* `.` and empty segments are no-ops. Excess `..` once the base is empty is also a no-op
* (subsequent named segments still append). An empty / normalized-empty `rel` returns the
* base path unchanged.
*
* Mirrors the Rust `Path::resolve(&PathRelative)` helper used by hang catalogs to express
* cross-broadcast track references.
*
* @example
* ```typescript
* resolveBroadcast(Path.from("a/b/c"), "../source"); // "a/b/source"
* resolveBroadcast(Path.from("a/b"), "x/y"); // "a/b/x/y"
* resolveBroadcast(Path.from("a"), "../../x"); // "x"
* resolveBroadcast(Path.from("a/b"), "./c"); // "a/b/c"
* ```
*/
/**
* Zod schema for a relative broadcast reference stored in a catalog. Normalizes the input
* the same way Rust `PathRelative::new` does so JS and Rust agree byte-for-byte on what's
* stored in memory after deserialization.
*/
export const RelativeBroadcastSchema = z.pipe(z.string(), z.transform(normalizeRelativeBroadcast));

export function resolveBroadcast(base: Path.Valid, rel: string): Path.Valid {
const baseSegments = base === "" ? [] : base.split("/").filter((s) => s !== "");
const relSegments = rel.split("/").filter((s) => s !== "" && s !== ".");

for (const seg of relSegments) {
if (seg === "..") {
baseSegments.pop();
} else {
baseSegments.push(seg);
}
}

return Path.from(...baseSegments);
}
6 changes: 6 additions & 0 deletions js/hang/src/catalog/video.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as z from "zod/mini";
import { ContainerSchema } from "./container";
import { u53Schema } from "./integers";
import { RelativeBroadcastSchema } from "./path";

// Backwards compatibility: old track schema
const TrackSchema = z.object({
Expand All @@ -9,6 +10,11 @@ const TrackSchema = z.object({

// Based on VideoDecoderConfig
export const VideoConfigSchema = z.object({
// Optional reference to another broadcast that publishes this track, expressed
// relative to the broadcast that served this catalog (e.g. "../source").
// If unset, the track lives in the same broadcast as the catalog.
broadcast: z.optional(RelativeBroadcastSchema),

// See: https://w3c.github.io/webcodecs/codec_registry.html
codec: z.string(),

Expand Down
4 changes: 3 additions & 1 deletion js/watch/src/audio/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ export class Decoder {
const config = effect.get(this.source.config);
if (!config) return;

const active = effect.get(broadcast.active);
// Honor a per-rendition `broadcast` override: resolve to the source broadcast if set,
// otherwise use the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const sub = active.subscribe(track, Catalog.PRIORITY.audio);
Expand Down
8 changes: 5 additions & 3 deletions js/watch/src/audio/mse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ export class Mse implements Backend {
const broadcast = effect.get(this.source.broadcast);
if (!broadcast) return;

const active = effect.get(broadcast.active);
if (!active) return;

const track = effect.get(this.source.track);
if (!track) return;

const config = effect.get(this.source.config);
if (!config) return;

// Honor a per-rendition `broadcast` override: resolve to the source broadcast if set,
// otherwise use the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const mime = `audio/mp4; codecs="${config.codec}"`;

const sourceBuffer = mediaSource.addSourceBuffer(mime);
Expand Down
68 changes: 60 additions & 8 deletions js/watch/src/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,28 @@ export class Broadcast {
}

#runAnnouncedNow(effect: Effect): void {
const name = effect.get(this.name);
this.#announcedNow.set(this.#isPathAnnounced(effect, name));
}

// Whether `name` is currently announced on the connection (or skipping the check
// because reload is off or the relay doesn't support announcements). Used by both
// `#runAnnouncedNow` (for `this.name`) and `#override` (for cross-broadcast refs).
#isPathAnnounced(effect: Effect, name: Path.Valid): boolean {
const reload = effect.get(this.reload);
if (!reload) {
this.#announcedNow.set(true);
return;
}
if (!reload) return true;

// Cloudflare's relay does not yet support announcement subscriptions,
// so an announcement will never arrive. Fall back to subscribing
// immediately (reload=false behaviour) instead of waiting forever.
const conn = effect.get(this.connection);
if (conn?.url.hostname.endsWith("mediaoverquic.com")) {
console.warn("Cloudflare relay does not support broadcast discovery yet; ignoring reload signal.");
this.#announcedNow.set(true);
return;
return true;
}

const name = effect.get(this.name);
const announced = effect.get(this.#announced);
this.#announcedNow.set(announced.has(name));
return announced.has(name);
}

#runBroadcast(effect: Effect): void {
Expand Down Expand Up @@ -184,6 +187,55 @@ export class Broadcast {
});
}

/**
* Resolve the `Moq.Broadcast` that publishes a given track.
*
* If `configBroadcast` is set, treat it as a path relative to this broadcast's name and
* subscribe to the resolved broadcast on the same connection. Otherwise return the catalog's
* own active broadcast.
*
* Override broadcasts are cached per resolved path and owned by this Broadcast's
* `signals`; the caller's `effect` only subscribes to the cached signal. This means
* many renditions referencing the same source share one underlying subscription, and
* the override outlives any single caller effect.
*/
trackBroadcast(effect: Effect, configBroadcast: string | undefined): Moq.Broadcast | undefined {
if (!configBroadcast) return effect.get(this.active);

const basePath = effect.get(this.name);
const resolved = Catalog.resolveBroadcast(basePath, configBroadcast);

// Self-reference (including `..` paths that walk back to the catalog's own path,
// and any rel that normalizes to empty): reuse the catalog's broadcast handle
// instead of opening a duplicate subscription on the same path.
if (resolved === basePath) return effect.get(this.active);

return effect.get(this.#override(resolved));
}

#overrides = new Map<Path.Valid, Signal<Moq.Broadcast | undefined>>();

#override(path: Path.Valid): Signal<Moq.Broadcast | undefined> {
const cached = this.#overrides.get(path);
if (cached) return cached;

const signal = new Signal<Moq.Broadcast | undefined>(undefined);
this.#overrides.set(path, signal);

this.signals.run((effect) => {
const conn = effect.get(this.connection);
if (!conn) return;

if (!this.#isPathAnnounced(effect, path)) return;

const remote = conn.consume(path);
effect.cleanup(() => remote.close());
effect.set(signal, remote, undefined);
});

return signal;
}

close() {
this.signals.close();
}
Expand Down
4 changes: 3 additions & 1 deletion js/watch/src/video/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ export class Decoder implements Backend {
}
const [_, source, track, config] = values;

const broadcast: Moq.Broadcast | undefined = effect.get(source.active);
// Honor a per-rendition `broadcast` override: subscribe on the resolved source
// broadcast instead of the catalog's broadcast. Falls back to the catalog's broadcast.
const broadcast: Moq.Broadcast | undefined = source.trackBroadcast(effect, config.broadcast);
if (!broadcast) {
// Going offline should clear the last rendered frame.
this.#active.set(undefined);
Expand Down
8 changes: 5 additions & 3 deletions js/watch/src/video/mse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ export class Mse implements Backend {
const broadcast = effect.get(this.source.broadcast);
if (!broadcast) return;

const active = effect.get(broadcast.active);
if (!active) return;

const track = effect.get(this.source.track);
if (!track) return;

const config = effect.get(this.source.config);
if (!config) return;

// Honor a per-rendition `broadcast` override: resolve to the source broadcast if set,
// otherwise use the catalog's own broadcast.
const active = broadcast.trackBroadcast(effect, config.broadcast);
if (!active) return;

const mime = `video/mp4; codecs="${config.codec}"`;

const sourceBuffer = mediaSource.addSourceBuffer(mime);
Expand Down
24 changes: 23 additions & 1 deletion rs/hang/examples/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,38 @@ async fn run_subscribe(mut consumer: moq_net::OriginConsumer) -> anyhow::Result<
codec = %config.codec,
width = ?config.coded_width,
height = ?config.coded_height,
broadcast_override = ?config.broadcast.as_ref().map(|p| p.as_str()),
"subscribing to video track"
);

// If the rendition references a different broadcast (e.g. a source feed that this
// catalog only sidecars), resolve it relative to the catalog's broadcast path and
// wait for the announcement. Otherwise subscribe on the catalog's broadcast.
// Treat an empty rel, a rel that resolves back to our own path, or a rel that
// resolves to empty (excess `..`) as "no override" so we reuse the existing
// broadcast handle instead of opening a redundant or unrouteable subscription.
let track_broadcast = match config.broadcast.as_ref() {
Some(rel) if !rel.is_empty() => {
let resolved = path.resolve(rel);
if resolved.is_empty() || resolved == path {
broadcast.clone()
} else {
consumer
.announced_broadcast(&resolved)
.await
.ok_or_else(|| anyhow::anyhow!("source broadcast unavailable: {resolved}"))?
}
}
_ => broadcast.clone(),
};

// Subscribe to the video track.
let track = moq_net::Track {
name: name.clone(),
priority: 1,
};

let track_consumer = broadcast.subscribe_track(&track)?;
let track_consumer = track_broadcast.subscribe_track(&track)?;
let mut ordered = moq_mux::container::Consumer::new(track_consumer, moq_mux::catalog::hang::Container::Legacy)
.with_latency(Duration::from_millis(500));

Expand Down
7 changes: 7 additions & 0 deletions rs/hang/src/catalog/audio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ impl Audio {
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct AudioConfig {
/// Optional reference to another broadcast that publishes this track, expressed
/// relative to the broadcast that served this catalog. If unset, the track lives
/// in the same broadcast as the catalog.
#[serde(default)]
pub broadcast: Option<moq_net::PathRelativeOwned>,

// The codec, see the registry for details:
// https://w3c.github.io/webcodecs/codec_registry.html
#[serde_as(as = "DisplayFromStr")]
Expand Down Expand Up @@ -107,6 +113,7 @@ impl AudioConfig {
/// since the type is `#[non_exhaustive]`.
pub fn new(codec: impl Into<AudioCodec>, sample_rate: u32, channel_count: u32) -> Self {
Self {
broadcast: None,
codec: codec.into(),
sample_rate,
channel_count,
Expand Down
Loading
Loading