-
Notifications
You must be signed in to change notification settings - Fork 6
Migrate Edge Configuration to SDK #413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5778c87
240f124
ab42ece
83fe037
3d2895d
5ca292e
7c2b321
ddb26a1
2f8a474
b3a7f66
8fcebb9
45feffa
18a33b2
dbe714f
0fbd05e
cb9394a
21c5cd8
a4469a8
963b35b
53af849
9c06701
db1d86b
8d4a491
79121f8
418eecf
b0b53db
bf0c284
d0608a1
83d8f75
8505790
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| from .config import ( | ||
| DEFAULT, | ||
| DISABLED, | ||
| EDGE_ANSWERS_WITH_ESCALATION, | ||
| NO_CLOUD, | ||
| DetectorConfig, | ||
| DetectorsConfig, | ||
| EdgeEndpointConfig, | ||
| GlobalConfig, | ||
| InferenceConfig, | ||
| ) | ||
|
|
||
| __all__ = [ | ||
| "DEFAULT", | ||
| "DISABLED", | ||
| "EDGE_ANSWERS_WITH_ESCALATION", | ||
| "NO_CLOUD", | ||
| "DetectorsConfig", | ||
| "DetectorConfig", | ||
| "EdgeEndpointConfig", | ||
| "GlobalConfig", | ||
| "InferenceConfig", | ||
| ] | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,215 @@ | ||
| from typing import Any, Optional, Union | ||
|
|
||
| import yaml | ||
| from model import Detector | ||
| from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator | ||
| from typing_extensions import Self | ||
|
|
||
|
|
||
| class GlobalConfig(BaseModel): | ||
| """Global runtime settings for edge-endpoint behavior.""" | ||
|
|
||
| model_config = ConfigDict(extra="forbid") | ||
|
|
||
| refresh_rate: float = Field( | ||
| default=60.0, | ||
| description="The interval (in seconds) at which the inference server checks for a new model binary update.", | ||
| ) | ||
| confident_audit_rate: float = Field( | ||
| default=1e-5, # A detector running at 1 FPS = ~100,000 IQ/day, so 1e-5 is ~1 confident IQ/day audited | ||
| description="The probability that any given confident prediction will be sent to the cloud for auditing.", | ||
| ) | ||
|
|
||
|
|
||
| class InferenceConfig(BaseModel): | ||
| """ | ||
| Configuration for edge inference on a specific detector. | ||
| """ | ||
|
|
||
| # Keep shared presets immutable (DEFAULT/NO_CLOUD/etc.) so one mutation cannot globally change behavior. | ||
| model_config = ConfigDict(extra="forbid", frozen=True) | ||
|
|
||
| name: str = Field(..., exclude=True, description="A unique name for this inference config preset.") | ||
| enabled: bool = Field( | ||
| default=True, description="Whether the edge endpoint should accept image queries for this detector." | ||
| ) | ||
| api_token: Optional[str] = Field( | ||
| default=None, description="API token used to fetch the inference model for this detector." | ||
| ) | ||
| always_return_edge_prediction: bool = Field( | ||
| default=False, | ||
| description=( | ||
| "Indicates if the edge-endpoint should always provide edge ML predictions, regardless of confidence. " | ||
| "When this setting is true, whether or not the edge-endpoint should escalate low-confidence predictions " | ||
| "to the cloud is determined by `disable_cloud_escalation`." | ||
| ), | ||
| ) | ||
| disable_cloud_escalation: bool = Field( | ||
| default=False, | ||
| description=( | ||
| "Never escalate ImageQueries from the edge-endpoint to the cloud. " | ||
| "Requires `always_return_edge_prediction=True`." | ||
| ), | ||
| ) | ||
| min_time_between_escalations: float = Field( | ||
| default=2.0, | ||
| description=( | ||
| "The minimum time (in seconds) to wait between cloud escalations for a given detector. " | ||
| "Cannot be less than 0.0. " | ||
| "Only applies when `always_return_edge_prediction=True` and `disable_cloud_escalation=False`." | ||
| ), | ||
| ) | ||
|
|
||
| @model_validator(mode="after") | ||
| def validate_configuration(self) -> Self: | ||
| if self.disable_cloud_escalation and not self.always_return_edge_prediction: | ||
| raise ValueError( | ||
| "The `disable_cloud_escalation` flag is only valid when `always_return_edge_prediction` is set to True." | ||
| ) | ||
| if self.min_time_between_escalations < 0.0: | ||
| raise ValueError("`min_time_between_escalations` cannot be less than 0.0.") | ||
| return self | ||
|
|
||
|
|
||
| class DetectorConfig(BaseModel): | ||
| """ | ||
| Configuration for a specific detector. | ||
| """ | ||
|
|
||
| model_config = ConfigDict(extra="forbid") | ||
|
|
||
| detector_id: str = Field(..., description="Detector ID") | ||
| edge_inference_config: str = Field(..., description="Config for edge inference.") | ||
|
|
||
|
|
||
| class ConfigBase(BaseModel): | ||
| """Shared detector/inference configuration behavior for edge config models.""" | ||
|
|
||
| model_config = ConfigDict(extra="forbid") | ||
|
|
||
| edge_inference_configs: dict[str, InferenceConfig] = Field(default_factory=dict) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just for internal bookkeeping? Users will never instantiate a DetectorsConfig object with populated arguments, but rather should create an empty DetectorsConfig and then use the add_detector method?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was planning to support both paths. I think the |
||
| detectors: list[DetectorConfig] = Field(default_factory=list) | ||
|
|
||
| @field_validator("edge_inference_configs", mode="before") | ||
| @classmethod | ||
| def hydrate_inference_config_names( | ||
| cls, value: dict[str, InferenceConfig | dict[str, Any]] | None | ||
| ) -> dict[str, InferenceConfig | dict[str, Any]]: | ||
| """Hydrate InferenceConfig.name from payload mapping keys.""" | ||
| if value is None: | ||
| return {} | ||
| if not isinstance(value, dict): | ||
| return value | ||
|
|
||
| hydrated_configs: dict[str, InferenceConfig | dict[str, Any]] = {} | ||
| for name, config in value.items(): | ||
| if isinstance(config, InferenceConfig): | ||
| hydrated_configs[name] = config | ||
| continue | ||
| if not isinstance(config, dict): | ||
| raise TypeError("Each edge inference config must be an object.") | ||
| hydrated_configs[name] = {"name": name, **config} | ||
| return hydrated_configs | ||
|
|
||
| @model_validator(mode="after") | ||
| def validate_inference_configs(self): | ||
| """ | ||
| Validates detector config state. | ||
| Raises ValueError if dict keys mismatch InferenceConfig.name, detector IDs are duplicated, | ||
| or any detector references an undefined inference config. | ||
| """ | ||
| for name, config in self.edge_inference_configs.items(): | ||
| if name != config.name: | ||
| raise ValueError(f"Edge inference config key '{name}' must match InferenceConfig.name '{config.name}'.") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given this validation, why is this stored as a dict to begin with? It could be a list of InferenceConfigs and you scan the list for the config with the name you want. More expensive? Sure, but the configs should be optimized for readability rather than performance. If you have so many inference configs that you're worried about performance here then there are going to be a lot of other problems first
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was written to reconcile the YAML config format that edge-endpoint already uses (which I am intentionally not changing) and the SDK model shape, which is comprised of pydantic objects so the user doesn't have to work with dicts. |
||
|
|
||
| seen_detector_ids = set() | ||
| duplicate_detector_ids = set() | ||
| for detector_config in self.detectors: | ||
| detector_id = detector_config.detector_id | ||
| if detector_id in seen_detector_ids: | ||
| duplicate_detector_ids.add(detector_id) | ||
| else: | ||
| seen_detector_ids.add(detector_id) | ||
| if duplicate_detector_ids: | ||
| duplicates = ", ".join(sorted(duplicate_detector_ids)) | ||
| raise ValueError(f"Duplicate detector IDs are not allowed: {duplicates}.") | ||
|
|
||
| for detector_config in self.detectors: | ||
| if detector_config.edge_inference_config not in self.edge_inference_configs: | ||
| raise ValueError(f"Edge inference config '{detector_config.edge_inference_config}' not defined.") | ||
| return self | ||
|
|
||
| def add_detector(self, detector: Union[str, Detector], edge_inference_config: InferenceConfig) -> None: | ||
| """Add a detector with the given inference config. Accepts detector ID or Detector object.""" | ||
| detector_id = detector.id if isinstance(detector, Detector) else detector | ||
| if any(existing.detector_id == detector_id for existing in self.detectors): | ||
| raise ValueError(f"A detector with ID '{detector_id}' already exists.") | ||
|
|
||
| existing = self.edge_inference_configs.get(edge_inference_config.name) | ||
| if existing is None: | ||
| self.edge_inference_configs[edge_inference_config.name] = edge_inference_config | ||
| elif existing != edge_inference_config: | ||
| raise ValueError( | ||
| f"A different inference config named '{edge_inference_config.name}' is already registered." | ||
| ) | ||
|
|
||
| self.detectors.append(DetectorConfig(detector_id=detector_id, edge_inference_config=edge_inference_config.name)) | ||
|
|
||
| def to_payload(self) -> dict[str, Any]: | ||
| """Return this config as a payload dictionary.""" | ||
| return self.model_dump() | ||
|
|
||
|
|
||
| class DetectorsConfig(ConfigBase): | ||
| """ | ||
| Detector and inference-config mappings for edge inference. | ||
| """ | ||
|
|
||
|
|
||
| class EdgeEndpointConfig(ConfigBase): | ||
| """ | ||
| Top-level edge endpoint configuration. | ||
| """ | ||
|
|
||
| global_config: GlobalConfig = Field(default_factory=GlobalConfig) | ||
|
|
||
| @classmethod | ||
| def from_yaml( | ||
| cls, | ||
| filename: Optional[str] = None, | ||
| yaml_str: Optional[str] = None, | ||
| ) -> "EdgeEndpointConfig": | ||
| """Create an EdgeEndpointConfig from a YAML filename or YAML string.""" | ||
| if filename is None and yaml_str is None: | ||
| raise ValueError("Either filename or yaml_str must be provided.") | ||
| if filename is not None and yaml_str is not None: | ||
| raise ValueError("Only one of filename or yaml_str can be provided.") | ||
| if filename is not None: | ||
| if not filename.strip(): | ||
| raise ValueError("filename must be a non-empty path when provided.") | ||
| with open(filename, "r") as f: | ||
| yaml_str = f.read() | ||
|
|
||
| yaml_text = yaml_str or "" | ||
| parsed = yaml.safe_load(yaml_text) or {} | ||
| return cls.model_validate(parsed) | ||
|
|
||
| @classmethod | ||
| def from_payload(cls, payload: dict[str, Any]) -> "EdgeEndpointConfig": | ||
| """Construct an EdgeEndpointConfig from a payload dictionary.""" | ||
| return cls.model_validate(payload) | ||
|
|
||
|
|
||
| # Preset inference configs matching the standard edge-endpoint defaults. | ||
| DEFAULT = InferenceConfig(name="default") | ||
| EDGE_ANSWERS_WITH_ESCALATION = InferenceConfig( | ||
| name="edge_answers_with_escalation", | ||
| always_return_edge_prediction=True, | ||
| min_time_between_escalations=2.0, | ||
| ) | ||
| NO_CLOUD = InferenceConfig( | ||
| name="no_cloud", | ||
| always_return_edge_prediction=True, | ||
| disable_cloud_escalation=True, | ||
| ) | ||
| DISABLED = InferenceConfig(name="disabled", enabled=False) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, this is a little redundant since it includes all objects that could be importet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ChatGPT says: "good point. all is kept intentionally to make the public API explicit/stable for this new module (and to control wildcard-import surface), but it can be removed if repo convention prefers omitting it."
I'm not really sure which way is best...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now it's fine either way, worst that can happen leaving it in is that someone adds something later and gets caught off guard when it doesn't import the way they expect. Leave it since it's less keystrokes now that it's already in