Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions ngraph/failure_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import dataclass, field
from random import random


@dataclass(slots=True)
class FailurePolicy:
"""
Mapping from element tag to failure probability.
"""

failure_probabilities: dict[str, float] = field(default_factory=dict)
distribution: str = "uniform"

def test_failure(self, tag: str) -> bool:
if self.distribution == "uniform":
return random() < self.failure_probabilities.get(tag, 0)
else:
raise ValueError(f"Unsupported distribution: {self.distribution}")
267 changes: 81 additions & 186 deletions ngraph/network.py
Original file line number Diff line number Diff line change
@@ -1,207 +1,102 @@
from __future__ import annotations
import uuid
from typing import Any, Dict, List, Optional, NamedTuple, Hashable
import concurrent.futures
import base64
from dataclasses import dataclass, field
from typing import Any, Dict


from ngraph.lib.graph import MultiDiGraph
from ngraph.lib.common import init_flow_graph
from ngraph.lib.max_flow import calc_max_flow
def new_base64_uuid() -> str:
"""
Generate a Base64-encoded UUID without padding (~22 characters).
"""
return base64.urlsafe_b64encode(uuid.uuid4().bytes).decode("ascii").rstrip("=")


class LinkID(NamedTuple):
src_node: Hashable
dst_node: Hashable
unique_id: Hashable
@dataclass(slots=True)
class Node:
"""
Represents a node in the network.

Each node is uniquely identified by its name, which is used as the key
in the Network's node dictionary.

class Node:
def __init__(self, node_id: str, node_type: str = "simple", **attributes: Dict):
self.node_id: str = node_id
self.node_type: str = node_type
self.attributes: Dict[str, Any] = {
"node_id": node_id,
"node_type": node_type,
"plane_ids": [],
"total_link_capacity": 0,
"non_transit": False,
"transit_only": False, # no local sinks/sources
"lat": 0,
"lon": 0,
}
self.update_attributes(**attributes)
self.sub_nodes: Dict[str, "Node"] = {} # Used if node_type is 'composite'
self.sub_links: Dict[str, "Link"] = {} # Used if node_type is 'composite'

def add_sub_node(self, sub_node_id: str, **attributes: Any):
# Logic to add a sub-node to a composite node
...

def add_sub_link(
self, sub_link_id: str, sub_node1: str, sub_node2: str, **attributes: Any
):
# Logic to add a sub-link to a composite node
...

def update_attributes(self, **attributes: Any):
"""
Update the attributes of the node.
"""
self.attributes.update(attributes)
:param name: The unique name of the node.
:param attrs: Optional extra metadata for the node.
"""

name: str
attrs: Dict[str, Any] = field(default_factory=dict)


@dataclass(slots=True)
class Link:
def __init__(
self,
node1: str,
node2: str,
link_id: Optional[LinkID] = None,
**attributes: Dict,
):
self.link_id: str = (
LinkID(node1, node2, str(uuid.uuid4())) if link_id is None else link_id
)
self.node1: str = node1
self.node2: str = node2
self.attributes: Dict[str, Any] = {
"link_id": self.link_id,
"node1": node1,
"node2": node2,
"plane_ids": [],
"capacity": 0,
"metric": 0,
"distance": 0,
}
self.update_attributes(**attributes)

def update_attributes(self, **attributes: Any):
"""
Represents a link connecting two nodes in the network.

The 'source' and 'target' fields reference node names. A unique link ID
is auto-generated from the source, target, and a random Base64-encoded UUID,
allowing multiple distinct links between the same nodes.

:param source: Unique name of the source node.
:param target: Unique name of the target node.
:param capacity: Link capacity (default 1.0).
:param latency: Link latency (default 1.0).
:param cost: Link cost (default 1.0).
:param attrs: Optional extra metadata for the link.
:param id: Auto-generated unique link identifier.
"""

source: str
target: str
capacity: float = 1.0
latency: float = 1.0
cost: float = 1.0
attrs: Dict[str, Any] = field(default_factory=dict)
id: str = field(init=False)

def __post_init__(self) -> None:
"""
Update the attributes of the link.
Auto-generate a unique link ID by combining the source, target,
and a random Base64-encoded UUID.
"""
self.attributes.update(attributes)
self.id = f"{self.source}-{self.target}-{new_base64_uuid()}"


@dataclass(slots=True)
class Network:
def __init__(self):
self.planes: Dict[str, MultiDiGraph] = {} # Key is plane_id
self.nodes: Dict[str, Node] = {} # Key is unique node_id
self.links: Dict[str, Link] = {} # Key is unique link_id
"""
A container for network nodes and links.

@staticmethod
def generate_edge_id(from_node: str, to_node: str, link_id: LinkID) -> str:
"""
Generate a unique edge ID for a link between two nodes.
"""
return LinkID(from_node, to_node, link_id[2])

def add_plane(self, plane_id: str):
self.planes[plane_id] = init_flow_graph(MultiDiGraph())

def add_node(
self,
node_id: str,
plane_ids: Optional[List[str]] = None,
node_type: str = "simple",
**attributes: Any,
) -> str:
new_node = Node(node_id, node_type, **attributes)
self.nodes[new_node.node_id] = new_node

if plane_ids is None:
plane_ids = self.planes.keys()

for plane_id in plane_ids:
self.planes[plane_id].add_node(new_node.node_id, **attributes)
new_node.attributes["plane_ids"].append(plane_id)
return new_node.node_id

def add_link(
self,
node1: str,
node2: str,
plane_ids: Optional[List[str]] = None,
**attributes: Any,
) -> str:
new_link = Link(node1, node2, **attributes)
self.links[new_link.link_id] = new_link

if plane_ids is None:
plane_ids = self.planes.keys()

for plane_id in plane_ids:
self.planes[plane_id].add_edge(
node1,
node2,
edge_id=self.generate_edge_id(node1, node2, new_link.link_id),
capacity=new_link.attributes["capacity"] / len(plane_ids),
metric=new_link.attributes["metric"],
)
self.planes[plane_id].add_edge(
node2,
node1,
edge_id=self.generate_edge_id(node2, node1, new_link.link_id),
capacity=new_link.attributes["capacity"] / len(plane_ids),
metric=new_link.attributes["metric"],
)
new_link.attributes["plane_ids"].append(plane_id)

# Update the total link capacity of the nodes
self.nodes[node1].attributes["total_link_capacity"] += new_link.attributes[
"capacity"
]
self.nodes[node2].attributes["total_link_capacity"] += new_link.attributes[
"capacity"
]
return new_link.link_id

@staticmethod
def plane_max_flow(plane_id, plane_graph, src_node, dst_nodes) -> Optional[float]:
Nodes are stored in a dictionary keyed by their unique names.
Links are stored in a dictionary keyed by their auto-generated IDs.
The 'attrs' dict allows extra network metadata.

:param nodes: Mapping from node name to Node.
:param links: Mapping from link id to Link.
:param attrs: Optional extra metadata for the network.
"""

nodes: Dict[str, Node] = field(default_factory=dict)
links: Dict[str, Link] = field(default_factory=dict)
attrs: Dict[str, Any] = field(default_factory=dict)

def add_node(self, node: Node) -> None:
"""
Calculate the maximum flow between src and dst for a single plane.
There can be multiple dst nodes, they all are attached to the same virtual sink node.
Add a node to the network, keyed by its name.

:param node: The Node to add.
"""
if src_node in plane_graph:
for dst_node in dst_nodes:
if dst_node in plane_graph:
# add a pseudo node to the graph to act as the sink for the max flow calculation
plane_graph.add_edge(
dst_node,
"sink",
edge_id=-1,
capacity=2**32,
metric=0,
flow=0,
flows={},
)
if "sink" in plane_graph:
return calc_max_flow(plane_graph, src_node, "sink")

def calc_max_flow(
self, src_nodes: List[str], dst_nodes: List[str]
) -> Dict[str, Dict[str, float]]:
self.nodes[node.name] = node

def add_link(self, link: Link) -> None:
"""
Calculate the maximum flow between each of the src nodes and all of the dst nodes.
All the dst nodes are attached to the same virtual sink node.
Runs the calculation in parallel for all planes and src nodes.
Add a link to the network. Both source and target nodes must exist.

:param link: The Link to add.
:raises ValueError: If the source or target node is not present.
"""
with concurrent.futures.ProcessPoolExecutor() as executor:
future_to_plane_source = {}
for plane_id in self.planes:
for src_node in src_nodes:
future_to_plane_source[
executor.submit(
self.plane_max_flow,
plane_id,
self.planes[plane_id],
src_node,
dst_nodes,
)
] = (plane_id, src_node, dst_nodes)

results = {}
for future in concurrent.futures.as_completed(future_to_plane_source):
plane_id, src_node, dst_nodes = future_to_plane_source[future]
results.setdefault(src_node, {})
results[src_node].setdefault(tuple(dst_nodes), {})
results[src_node][tuple(dst_nodes)][plane_id] = future.result()
return results
if link.source not in self.nodes:
raise ValueError(f"Source node '{link.source}' not found in network.")
if link.target not in self.nodes:
raise ValueError(f"Target node '{link.target}' not found in network.")
self.links[link.id] = link
56 changes: 56 additions & 0 deletions ngraph/results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from dataclasses import dataclass, field
from typing import Any, Dict


@dataclass(slots=True)
class Results:
"""
A container for storing arbitrary key-value data that arises during workflow steps.
The data is organized by step name, then by key.

Example usage:
results.put("Step1", "total_capacity", 123.45)
cap = results.get("Step1", "total_capacity") # returns 123.45
all_caps = results.get_all("total_capacity") # might return {"Step1": 123.45, "Step2": 98.76}
"""

# Internally, store per-step data in a nested dict:
# _store[step_name][key] = value
_store: Dict[str, Dict[str, Any]] = field(default_factory=dict)

def put(self, step_name: str, key: str, value: Any) -> None:
"""
Store a value under (step_name, key).
If the step_name sub-dict does not exist, it is created.

:param step_name: The workflow step that produced the result.
:param key: A short label describing the data (e.g. "total_capacity").
:param value: The actual data to store (can be any Python object).
"""
if step_name not in self._store:
self._store[step_name] = {}
self._store[step_name][key] = value

def get(self, step_name: str, key: str, default: Any = None) -> Any:
"""
Retrieve the value from (step_name, key). If the key is missing, return `default`.

:param step_name: The workflow step name.
:param key: The key under which the data was stored.
:param default: Value to return if the (step_name, key) is not present.
:return: The data, or `default` if not found.
"""
return self._store.get(step_name, {}).get(key, default)

def get_all(self, key: str) -> Dict[str, Any]:
"""
Retrieve a dictionary of {step_name: value} for all step_names that contain the specified key.

:param key: The key to look up in each step.
:return: A dict mapping step_name -> value for all steps that have stored something under 'key'.
"""
result = {}
for step_name, data in self._store.items():
if key in data:
result[step_name] = data[key]
return result
Loading