Skip to content

Commit 339f250

Browse files
[mutlicast] DDM multicast exchange: V4 protocol, MRIB sync, M2P programming
Adds multicast group subscription distribution to the DDM exchange protocol with a V4 version bump (frozen V3 types for wire compat). Key changes: - V4 exchange protocol with multicast support (V3 peers are unaffected) - UnderlayMulticastIpv6 validated newtype moved to mg-common (ff04::/64) (moved from rdb types) - MRIB->DDM sync in mg-lower/mrib.rs - OPTE M2P table programming on learned multicast routes - Atomic update_imported_mcast on Db (single lock for import/delete/diff, which is a bit different from the tunnel work) - Collapsed send_update dispatch - Shared pull handler helpers (collect_underlay_tunnel, collect_multicast) - MulticastPathHop constructor - Some serde round-trip and validation tests, including for version handling Stacked on zl/mrib (MRIB: Multicast RIB implementation [#675](#675)).
1 parent b73c104 commit 339f250

24 files changed

Lines changed: 2168 additions & 102 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ddm-admin-client/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,25 @@ impl std::hash::Hash for types::TunnelOrigin {
3939
self.metric.hash(state);
4040
}
4141
}
42+
43+
impl std::cmp::PartialEq for types::MulticastOrigin {
44+
fn eq(&self, other: &Self) -> bool {
45+
self.overlay_group.eq(&other.overlay_group)
46+
&& self.underlay_group.eq(&other.underlay_group)
47+
&& self.vni.eq(&other.vni)
48+
&& self.source.eq(&other.source)
49+
}
50+
}
51+
52+
impl std::cmp::Eq for types::MulticastOrigin {}
53+
54+
/// Metric is excluded from identity so that metric changes update
55+
/// an existing entry rather than creating a duplicate.
56+
impl std::hash::Hash for types::MulticastOrigin {
57+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
58+
self.overlay_group.hash(state);
59+
self.underlay_group.hash(state);
60+
self.vni.hash(state);
61+
self.source.hash(state);
62+
}
63+
}

ddm-api/src/lib.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use dropshot::Path;
1010
use dropshot::RequestContext;
1111
use dropshot::TypedBody;
1212
use dropshot_api_manager_types::api_versions;
13-
use mg_common::net::TunnelOrigin;
13+
use mg_common::net::{MulticastOrigin, TunnelOrigin};
1414
use oxnet::Ipv6Net;
1515
use std::collections::{HashMap, HashSet};
1616

@@ -26,6 +26,7 @@ api_versions!([
2626
// | example for the next person.
2727
// v
2828
// (next_int, IDENT),
29+
(2, MULTICAST_SUPPORT),
2930
(1, INITIAL),
3031
]);
3132

@@ -100,6 +101,44 @@ pub trait DdmAdminApi {
100101
request: TypedBody<HashSet<TunnelOrigin>>,
101102
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
102103

104+
#[endpoint {
105+
method = GET,
106+
path = "/originated_multicast_groups",
107+
versions = VERSION_MULTICAST_SUPPORT..
108+
}]
109+
async fn get_originated_multicast_groups(
110+
ctx: RequestContext<Self::Context>,
111+
) -> Result<HttpResponseOk<HashSet<MulticastOrigin>>, HttpError>;
112+
113+
#[endpoint {
114+
method = GET,
115+
path = "/multicast_groups",
116+
versions = VERSION_MULTICAST_SUPPORT..
117+
}]
118+
async fn get_multicast_groups(
119+
ctx: RequestContext<Self::Context>,
120+
) -> Result<HttpResponseOk<HashSet<latest::db::MulticastRoute>>, HttpError>;
121+
122+
#[endpoint {
123+
method = PUT,
124+
path = "/multicast_group",
125+
versions = VERSION_MULTICAST_SUPPORT..
126+
}]
127+
async fn advertise_multicast_groups(
128+
ctx: RequestContext<Self::Context>,
129+
request: TypedBody<HashSet<MulticastOrigin>>,
130+
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
131+
132+
#[endpoint {
133+
method = DELETE,
134+
path = "/multicast_group",
135+
versions = VERSION_MULTICAST_SUPPORT..
136+
}]
137+
async fn withdraw_multicast_groups(
138+
ctx: RequestContext<Self::Context>,
139+
request: TypedBody<HashSet<MulticastOrigin>>,
140+
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
141+
103142
#[endpoint { method = PUT, path = "/sync" }]
104143
async fn sync(
105144
ctx: RequestContext<Self::Context>,

ddm-types/versions/src/latest.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ pub mod db {
1515
pub use crate::v1::db::PeerStatus;
1616
pub use crate::v1::db::RouterKind;
1717
pub use crate::v1::db::TunnelRoute;
18+
pub use crate::v2::db::MulticastRoute;
1819
}
1920

2021
pub mod exchange {
2122
pub use crate::v1::exchange::PathVector;
2223
pub use crate::v1::exchange::PathVectorV2;
24+
pub use crate::v2::exchange::MulticastPathHop;
25+
pub use crate::v2::exchange::MulticastPathVector;
2326
}

ddm-types/versions/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,5 @@
3232
pub mod latest;
3333
#[path = "initial/mod.rs"]
3434
pub mod v1;
35+
#[path = "multicast_support/mod.rs"]
36+
pub mod v2;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
use std::net::Ipv6Addr;
6+
7+
use mg_common::net::MulticastOrigin;
8+
use schemars::JsonSchema;
9+
use serde::{Deserialize, Serialize};
10+
11+
use crate::v2::exchange::MulticastPathHop;
12+
13+
/// A multicast route learned via DDM.
14+
///
15+
/// Carries both the group origin and the path vector from the
16+
/// originating subscriber through intermediate transit routers.
17+
/// The path enables loop detection and (in multi-rack topologies)
18+
/// replication optimizations per [RFD 488] in the future.
19+
///
20+
/// Equality and hashing consider only `origin` and `nexthop` so that
21+
/// a route update with a longer path replaces the existing entry in
22+
/// hash-based collections.
23+
///
24+
/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488
25+
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
26+
pub struct MulticastRoute {
27+
/// The multicast group origin information.
28+
pub origin: MulticastOrigin,
29+
30+
/// Underlay nexthop address (DDM peer that advertised this route).
31+
/// Used to associate the route with a peer for expiration.
32+
pub nexthop: Ipv6Addr,
33+
34+
/// Path vector from the originating subscriber outward.
35+
/// Each hop records the router that redistributed this
36+
/// subscription announcement. Used for loop detection on pull
37+
/// and for future replication optimization in multi-rack
38+
/// topologies.
39+
#[serde(default)]
40+
pub path: Vec<MulticastPathHop>,
41+
}
42+
43+
impl PartialEq for MulticastRoute {
44+
fn eq(&self, other: &Self) -> bool {
45+
self.origin == other.origin && self.nexthop == other.nexthop
46+
}
47+
}
48+
49+
impl Eq for MulticastRoute {}
50+
51+
impl std::hash::Hash for MulticastRoute {
52+
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
53+
self.origin.hash(state);
54+
self.nexthop.hash(state);
55+
}
56+
}
57+
58+
impl From<MulticastRoute> for MulticastOrigin {
59+
fn from(x: MulticastRoute) -> Self {
60+
x.origin
61+
}
62+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
use schemars::JsonSchema;
6+
use serde::{Deserialize, Serialize};
7+
use std::net::Ipv6Addr;
8+
9+
/// A single hop in the multicast path, carrying metadata needed for
10+
/// replication optimization.
11+
///
12+
/// Unlike unicast paths which only need hostnames, multicast hops carry
13+
/// additional information for computing optimal replication points per
14+
/// [RFD 488].
15+
///
16+
/// [RFD 488]: https://rfd.shared.oxide.computer/rfd/0488
17+
#[derive(
18+
Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema,
19+
)]
20+
pub struct MulticastPathHop {
21+
/// Router identifier (hostname).
22+
pub router_id: String,
23+
24+
/// The underlay address of this router (for replication targeting).
25+
pub underlay_addr: Ipv6Addr,
26+
27+
/// Number of downstream subscribers reachable via this hop.
28+
/// Used for load-aware replication decisions in multi-rack
29+
/// topologies.
30+
#[serde(default)]
31+
pub downstream_subscriber_count: u32,
32+
}
33+
34+
impl MulticastPathHop {
35+
/// Create a hop with the given router identity and a zero subscriber
36+
/// count. The count will be populated once transit routers track
37+
/// downstream subscriber counts for load-aware replication (RFD 488).
38+
pub fn new(router_id: String, underlay_addr: Ipv6Addr) -> Self {
39+
Self {
40+
router_id,
41+
underlay_addr,
42+
downstream_subscriber_count: 0,
43+
}
44+
}
45+
}
46+
47+
/// Multicast group subscription announcement propagating through DDM.
48+
///
49+
/// The path records the sequence of routers from the original subscriber
50+
/// toward the current receiving router. Currently, this is used for loop
51+
/// detection: if our router_id appears in the path, the announcement has
52+
/// already traversed us and is dropped. The path structure also carries
53+
/// topology information for future replication optimizations (RFD 488).
54+
#[derive(
55+
Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize, JsonSchema,
56+
)]
57+
pub struct MulticastPathVector {
58+
/// The multicast group origin information.
59+
pub origin: mg_common::net::MulticastOrigin,
60+
61+
/// The path from the original subscriber to the current router.
62+
/// Ordered from subscriber outward (subscriber router first).
63+
pub path: Vec<MulticastPathHop>,
64+
}
65+
66+
impl MulticastPathVector {
67+
/// Append a hop to this path vector.
68+
pub fn with_hop(&self, hop: MulticastPathHop) -> Self {
69+
let mut path = self.path.clone();
70+
path.push(hop);
71+
Self {
72+
origin: self.origin.clone(),
73+
path,
74+
}
75+
}
76+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
//! Types from API version 2 (MULTICAST_SUPPORT) that add multicast
6+
//! group management to the DDM admin API.
7+
8+
pub mod db;
9+
pub mod exchange;

ddm/src/admin.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::sm::{AdminEvent, Event, PrefixSet, SmContext};
77
use ddm_api::DdmAdminApi;
88
use ddm_api::ddm_admin_api_mod;
99
use ddm_types::admin::{EnableStatsRequest, ExpirePathParams, PrefixMap};
10-
use ddm_types::db::{PeerInfo, TunnelRoute};
10+
use ddm_types::db::{MulticastRoute, PeerInfo, TunnelRoute};
1111
use ddm_types::exchange::PathVector;
1212
use dropshot::ApiDescription;
1313
use dropshot::ApiDescriptionBuildErrors;
@@ -21,7 +21,7 @@ use dropshot::Path;
2121
use dropshot::RequestContext;
2222
use dropshot::TypedBody;
2323
use mg_common::lock;
24-
use mg_common::net::TunnelOrigin;
24+
use mg_common::net::{MulticastOrigin, TunnelOrigin};
2525
use oxnet::Ipv6Net;
2626
use slog::{Logger, error, info};
2727
use std::collections::{HashMap, HashSet};
@@ -333,6 +333,71 @@ impl DdmAdminApi for DdmAdminApiImpl {
333333
Ok(HttpResponseUpdatedNoContent())
334334
}
335335

336+
async fn get_originated_multicast_groups(
337+
ctx: RequestContext<Self::Context>,
338+
) -> Result<HttpResponseOk<HashSet<MulticastOrigin>>, HttpError> {
339+
let ctx = lock!(ctx.context());
340+
let originated = ctx
341+
.db
342+
.originated_mcast()
343+
.map_err(|e| HttpError::for_internal_error(e.to_string()))?;
344+
Ok(HttpResponseOk(originated))
345+
}
346+
347+
async fn get_multicast_groups(
348+
ctx: RequestContext<Self::Context>,
349+
) -> Result<HttpResponseOk<HashSet<MulticastRoute>>, HttpError> {
350+
let ctx = lock!(ctx.context());
351+
let imported = ctx.db.imported_mcast();
352+
Ok(HttpResponseOk(imported))
353+
}
354+
355+
async fn advertise_multicast_groups(
356+
ctx: RequestContext<Self::Context>,
357+
request: TypedBody<HashSet<MulticastOrigin>>,
358+
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
359+
let ctx = lock!(ctx.context());
360+
let groups = request.into_inner();
361+
slog::info!(ctx.log, "advertise multicast groups: {groups:#?}");
362+
ctx.db
363+
.originate_mcast(&groups)
364+
.map_err(|e| HttpError::for_internal_error(e.to_string()))?;
365+
366+
for e in &ctx.event_channels {
367+
e.send(Event::Admin(AdminEvent::Announce(PrefixSet::Multicast(
368+
groups.clone(),
369+
))))
370+
.map_err(|e| {
371+
HttpError::for_internal_error(format!("admin event send: {e}"))
372+
})?;
373+
}
374+
375+
Ok(HttpResponseUpdatedNoContent())
376+
}
377+
378+
async fn withdraw_multicast_groups(
379+
ctx: RequestContext<Self::Context>,
380+
request: TypedBody<HashSet<MulticastOrigin>>,
381+
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
382+
let ctx = lock!(ctx.context());
383+
let groups = request.into_inner();
384+
slog::info!(ctx.log, "withdraw multicast groups: {groups:#?}");
385+
ctx.db
386+
.withdraw_mcast(&groups)
387+
.map_err(|e| HttpError::for_internal_error(e.to_string()))?;
388+
389+
for e in &ctx.event_channels {
390+
e.send(Event::Admin(AdminEvent::Withdraw(PrefixSet::Multicast(
391+
groups.clone(),
392+
))))
393+
.map_err(|e| {
394+
HttpError::for_internal_error(format!("admin event send: {e}"))
395+
})?;
396+
}
397+
398+
Ok(HttpResponseUpdatedNoContent())
399+
}
400+
336401
async fn sync(
337402
ctx: RequestContext<Self::Context>,
338403
) -> Result<HttpResponseUpdatedNoContent, HttpError> {

0 commit comments

Comments
 (0)