-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathattribute.py
More file actions
135 lines (103 loc) · 4.98 KB
/
attribute.py
File metadata and controls
135 lines (103 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from __future__ import annotations
import ipaddress
from typing import TYPE_CHECKING, Any, get_args
from ..protocols_base import CoreNodeBase
from ..uuidt import UUIDT
from .constants import ATTRIBUTE_METADATA_OBJECT, IP_TYPES, PROPERTIES_FLAG, PROPERTIES_OBJECT, SAFE_VALUE
from .property import NodeProperty
if TYPE_CHECKING:
from collections.abc import Callable
from ..schema import AttributeSchemaAPI
class Attribute:
"""Represents an attribute of a Node, including its schema, value, and properties."""
def __init__(self, name: str, schema: AttributeSchemaAPI, data: Any | dict) -> None:
"""
Args:
name (str): The name of the attribute.
schema (AttributeSchema): The schema defining the attribute.
data (Union[Any, dict]): The data for the attribute, either in raw form or as a dictionary.
"""
self.name = name
self._schema = schema
if not isinstance(data, dict) or "value" not in data:
data = {"value": data}
self._properties_flag = PROPERTIES_FLAG
self._properties_object = PROPERTIES_OBJECT
self._properties = self._properties_flag + self._properties_object
self._read_only = ["updated_at", "is_inherited"]
self.id: str | None = data.get("id")
self._value: Any | None = data.get("value")
self.value_has_been_mutated = False
self.is_default: bool | None = data.get("is_default")
self.is_from_profile: bool | None = data.get("is_from_profile")
if self._value:
value_mapper: dict[str, Callable] = {
"IPHost": ipaddress.ip_interface,
"IPNetwork": ipaddress.ip_network,
}
mapper = value_mapper.get(schema.kind, lambda value: value)
self._value = mapper(data.get("value"))
self.is_inherited: bool | None = data.get("is_inherited")
self.updated_at: str | None = data.get("updated_at")
self.is_protected: bool | None = data.get("is_protected")
self.source: NodeProperty | None = None
self.owner: NodeProperty | None = None
self.updated_by: NodeProperty | None = None
for prop_name in self._properties_object:
if data.get(prop_name):
setattr(self, prop_name, NodeProperty(data=data.get(prop_name))) # type: ignore[arg-type]
for prop_name in ATTRIBUTE_METADATA_OBJECT:
if data.get(prop_name):
setattr(self, prop_name, NodeProperty(data=data.get(prop_name))) # type: ignore[arg-type]
@property
def value(self) -> Any:
return self._value
@value.setter
def value(self, value: Any) -> None:
self._value = value
self.value_has_been_mutated = True
def _generate_input_data(self) -> dict | None:
data: dict[str, Any] = {}
variables: dict[str, Any] = {}
if self.value is None:
if self._schema.optional and self.value_has_been_mutated:
data["value"] = None
return data
if isinstance(self.value, str):
if SAFE_VALUE.match(self.value):
data["value"] = self.value
else:
var_name = f"value_{UUIDT.new().hex}"
variables[var_name] = self.value
data["value"] = f"${var_name}"
elif isinstance(self.value, get_args(IP_TYPES)):
data["value"] = self.value.with_prefixlen
elif isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool():
data["from_pool"] = {"id": self.value.id}
else:
data["value"] = self.value
for prop_name in self._properties_flag:
if getattr(self, prop_name) is not None:
data[prop_name] = getattr(self, prop_name)
for prop_name in self._properties_object:
if getattr(self, prop_name) is not None:
data[prop_name] = getattr(self, prop_name)._generate_input_data()
return {"data": data, "variables": variables}
def _generate_query_data(self, property: bool = False, include_metadata: bool = False) -> dict | None:
data: dict[str, Any] = {"value": None}
if property:
data.update({"is_default": None, "is_from_profile": None})
for prop_name in self._properties_flag:
data[prop_name] = None
for prop_name in self._properties_object:
data[prop_name] = {"id": None, "display_label": None, "__typename": None}
if include_metadata:
data["updated_at"] = None
for prop_name in ATTRIBUTE_METADATA_OBJECT:
data[prop_name] = {"id": None, "display_label": None, "__typename": None}
return data
def _generate_mutation_query(self) -> dict[str, Any]:
if isinstance(self.value, CoreNodeBase) and self.value.is_resource_pool():
# If it points to a pool, ask for the value of the pool allocated resource
return {self.name: {"value": None}}
return {}