-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathconfig_resolver.py
More file actions
226 lines (193 loc) · 7.59 KB
/
config_resolver.py
File metadata and controls
226 lines (193 loc) · 7.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
from __future__ import annotations
import functools
from collections.abc import Sequence
from .read_write_lock import ReadWriteLock
from .config_value_unwrapper import ConfigValueUnwrapper
from .context import Context
from ._internal_logging import InternalLogger
import prefab_pb2 as Prefab
logger = InternalLogger(__name__)
class ConfigResolver:
def __init__(self, base_client, config_loader):
self.local_store = None
self.lock = ReadWriteLock()
self.base_client = base_client
self.config_loader = config_loader
self.project_env_id = 0
self.default_context = {}
self.make_local()
def get(self, key, context=None) -> "Evaluation | None":
with self.lock.read_locked():
raw_config = self.raw(key)
if raw_config is None:
merged_context = self.evaluation_context(context)
return Evaluation(
config=None,
value=None,
config_row_index=0,
value_index=0,
context=merged_context,
resolver=self,
)
else:
return self.evaluate(raw_config, context=context)
def raw(self, key) -> Prefab.ConfigValue | None:
via_key = self.local_store.get(key)
if via_key is not None:
return via_key["config"]
return None
def evaluate(self, config, context=None) -> "Evaluation | None":
return CriteriaEvaluator(
config,
project_env_id=self.project_env_id,
resolver=self,
base_client=self.base_client,
).evaluate(self.evaluation_context(context))
def evaluation_context(self, context):
merged_context = Context()
merged_context.merge_context_dict(self.base_client.global_context.to_dict())
merged_context.merge_context_dict(self.default_context)
if Context.get_current():
merged_context.merge_context_dict(Context.get_current().to_dict())
if context:
merged_context.merge_context_dict(
Context.normalize_context_arg(context).to_dict()
)
return merged_context
def update(self):
self.make_local()
def make_local(self):
with self.lock.write_locked():
self.local_store = self.config_loader.calc_config()
@property
def default_context(self):
return self._default_context
@default_context.setter
def default_context(self, value):
self._default_context = Context.normalize_context_arg(value).to_dict()
OPS = Prefab.Criterion.CriterionOperator
class CriteriaEvaluator:
def __init__(self, config, project_env_id, resolver, base_client):
self.config = config
self.project_env_id = project_env_id
self.resolver = resolver
self.base_client = base_client
def evaluate(self, props):
matching_env_row_values = self.matching_environment_row_values()
default_row_index = 1 if matching_env_row_values else 0
for value_index, conditional_value in enumerate(matching_env_row_values):
if self.all_criteria_match(conditional_value, props):
return Evaluation(
self.config,
conditional_value.value,
value_index,
0,
props,
self.resolver,
)
for value_index, conditional_value in enumerate(self.default_row_values()):
if self.all_criteria_match(conditional_value, props):
return Evaluation(
self.config,
conditional_value.value,
value_index,
default_row_index,
props,
self.resolver,
)
return None
def all_criteria_match(self, conditional_value, props):
for criterion in conditional_value.criteria:
if not self.evaluate_criterion(criterion, props):
return False
return True
def evaluate_criterion(self, criterion, properties):
value_from_properties = properties.get(criterion.property_name)
if criterion.operator in [OPS.LOOKUP_KEY_IN, OPS.PROP_IS_ONE_OF]:
return self.matches(criterion, value_from_properties, properties)
if criterion.operator in [OPS.LOOKUP_KEY_NOT_IN, OPS.PROP_IS_NOT_ONE_OF]:
return not self.matches(criterion, value_from_properties, properties)
if criterion.operator == OPS.IN_SEG:
return self.in_segment(criterion, properties)
if criterion.operator == OPS.NOT_IN_SEG:
return not self.in_segment(criterion, properties)
if criterion.operator == OPS.PROP_ENDS_WITH_ONE_OF:
if value_from_properties is None:
return False
return any(
[
str(value_from_properties).endswith(ending)
for ending in criterion.value_to_match.string_list.values
]
)
if criterion.operator == OPS.PROP_DOES_NOT_END_WITH_ONE_OF:
if value_from_properties is None:
return True
return not any(
[
str(value_from_properties).endswith(ending)
for ending in criterion.value_to_match.string_list.values
]
)
if criterion.operator == OPS.HIERARCHICAL_MATCH:
return value_from_properties.startswith(criterion.value_to_match.string)
if criterion.operator == OPS.ALWAYS_TRUE:
return True
logger.info(f"Unknown criterion operator {criterion.operator}")
return False
def matches(self, criterion, value, properties):
criterion_value_or_values = ConfigValueUnwrapper.deepest_value(
criterion.value_to_match, self.config.key, properties
).unwrap()
if isinstance(criterion_value_or_values, Sequence) and not isinstance(
criterion_value_or_values, (str, bytes)
):
return str(value) in criterion_value_or_values
return value == criterion_value_or_values
def in_segment(self, criterion, properties):
return (
self.resolver.get(criterion.value_to_match.string, context=properties)
.raw_config_value()
.bool
)
def matching_environment_row_values(self):
env_rows = [
row for row in self.config.rows if row.project_env_id == self.project_env_id
]
if env_rows == []:
return []
else:
return env_rows[0].values
def default_row_values(self):
env_rows = [
row for row in self.config.rows if row.project_env_id != self.project_env_id
]
if env_rows == []:
return []
else:
return env_rows[0].values
class Evaluation:
def __init__(
self,
config: Prefab.Config | None,
value: Prefab.ConfigValue | None,
value_index: int,
config_row_index: int,
context: Context,
resolver: ConfigResolver,
):
self.config = config
self.value = value
self.value_index = value_index
self.config_row_index = config_row_index
self.context = context
self.resolver = resolver
def unwrapped_value(self):
return self.deepest_value().unwrap()
def raw_config_value(self):
return self.value
@functools.cache
def deepest_value(self):
return ConfigValueUnwrapper.deepest_value(
self.value, self.config, self.resolver, self.context
)