-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathdefault_factory.py
More file actions
358 lines (323 loc) · 14.9 KB
/
default_factory.py
File metadata and controls
358 lines (323 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
import inspect
import threading
from typing import Any
from digitalpy.core.digipy_configuration.domain.model.configuration import Configuration
from digitalpy.core.main.factory import Factory
import json
import importlib
class DefaultFactory(Factory):
required_interfaces = {
"event_manager": "digitalpy.core.main.event_manager.EventManager",
"logger": "logger.Logger",
"log_manager": "log_manager.LogManager",
"session": "session.Session",
"configuration": "digitalpy.core.configuration.Configuration",
"message": "message.Message",
"concurrency_manager": "concurrency_manager.ConcurrencyManager",
"action_mapper": "digitalpy.core.zmanager.action_mapper.ActionMapper",
"request": "digitalpy.core.zmanager.request.Request",
"response": "digitalpy.core.zmanager.response.Response",
"list_strategies": "list_strategy.ListStrategy",
"formats": "digitalpy.core.parsing.format.Format",
"formatter": "digitalpy.core.parsing.formatter.Formatter",
"principal_factory": "principal_factory.PrincipalFactory",
}
def __init__(self, configuration: Configuration):
self.configuration = configuration
self.current_stack = []
# factory instance is registered for use by the routing worker so that
# the instances in the instance dictionary can be preserved when the
# new object factory is instantiated in the sub-process
self.instances = {
"configuration": self.configuration,
"factory": self,
}
# store imported modules to prevent multiple imports
self.modules = {}
self.stack_lock = threading.Lock()
self._module_lock = threading.Lock()
self._instances_lock = threading.Lock()
self._instances_condition = threading.Condition(self._instances_lock)
def add_interfaces(self, interfaces: dict):
raise NotImplementedError("this method has not yet been implemented")
def clear(self):
with self._instances_lock:
self.instances = {
"configuration": self.configuration,
"factory": self,
}
with self.stack_lock:
self.current_stack = []
with self._module_lock:
self.modules = {}
DefaultFactory.required_interfaces = {
"event_manager": "digitalpy.core.main.event_manager.EventManager",
"logger": "logger.Logger",
"log_manager": "log_manager.LogManager",
"session": "session.Session",
"configuration": "digitalpy.core.configuration.Configuration",
"message": "message.Message",
"concurrency_manager": "concurrency_manager.ConcurrencyManager",
"action_mapper": "digitalpy.core.zmanager.action_mapper.ActionMapper",
"request": "digitalpy.core.zmanager.request.Request",
"response": "digitalpy.core.zmanager.response.Response",
"list_strategies": "list_strategy.ListStrategy",
"formats": "digitalpy.core.parsing.format.Format",
"formatter": "digitalpy.core.parsing.formatter.Formatter",
"principal_factory": "principal_factory.PrincipalFactory",
}
def get_instance(self, name, dynamic_configuration={}) -> object:
instance = None
with self.stack_lock:
self.current_stack.append(name)
instance_key = self.get_instance_key(name, dynamic_configuration)
if (
instance_key in self.instances
and dynamic_configuration.get("__cached", True) is True
):
instance = self._access_instance(instance_key)
else:
static_configuration = self.configuration.get_section(name, True)
configuration = dict(static_configuration, **dynamic_configuration)
instance = self.create_instance(name, configuration, instance_key)
self.current_stack.pop()
return instance
def get_instance_key(self, name, dynamic_configuration: dict):
"""Get the instance key for the given name and dynamic configuration"""
key_conf = dynamic_configuration.copy()
# remove the __cached key from the configuration
key_conf.pop("__cached", None)
if len(key_conf) == 0:
instance_key = name.lower()
else:
try:
instance_key = name + json.dumps(key_conf, sort_keys=True)
# exception caught where the values of dynamic_configuration are not json serializable
# but are required to be passed as arguments
except TypeError:
instance_key = name
return instance_key
def create_instance(self, name, configuration, instance_key):
instance = None
if configuration.get("__class") is not None:
class_name = configuration.get("__class")
class_name_parts = class_name.split(".")
if len(class_name_parts) == 2:
instance_class = getattr(
self.import_module(class_name_parts[0]),
class_name_parts[1],
)
else:
instance_class = getattr(
self.import_module(".".join(class_name_parts[:-1])),
class_name_parts[-1],
)
if callable(getattr(instance_class, "__init__", None)):
c_params = {}
instance_class_func = getattr(instance_class, "__init__")
instance_class_params = inspect.signature(instance_class_func)
for (
param_name,
param_default,
) in instance_class_params.parameters.items():
if (
param_name == "self"
or param_name == "args"
or param_name == "kwargs"
):
continue
param_instance_key = param_name.lower().replace("_", "")
# first check the configuration section for the parameter
if param_name in configuration:
c_params[param_name] = self.resolve_value(
configuration[param_name]
)
# then check if a parameter has already been initialized
elif param_instance_key in self.instances:
c_params[param_name] = self._access_instance(param_instance_key)
# check if a section with the name of the parameter exists
elif self.configuration.has_section(param_name):
c_params[param_name] = self.get_instance(param_name)
# check if a section with the name of the parameter in lowercase exists
elif self.configuration.has_section(param_instance_key):
c_params[param_name] = self.get_instance(param_instance_key)
elif isinstance(param_default, inspect._empty):
raise Exception(
f"constructor parameter {param_name} in class {name} cannot be injected"
)
instance = instance_class(**c_params)
interface = self.get_interface(name)
if interface != None and not isinstance(instance, interface):
raise Exception(
f"class {instance_class} is required to implement interface {interface}"
)
if (
"__shared" not in configuration
or configuration["__shared"] == "true"
):
self.register_instance(instance_key, instance)
for key, value in configuration.items():
if not key.startswith("__") and c_params.get(key, None) != None:
value = self.resolve_value(value)
setter_name = self.get_setter_name(key)
if getattr(instance, setter_name, None) != None:
getattr(instance, setter_name)(value)
else:
try:
setattr(instance, key, value)
except AttributeError:
# attribute might be a read-only property
pass
else:
# TODO: figure out the cases for a mapping being called and how to implement
interface = self.get_interface(name)
for key, value in configuration.items():
if value[0] == "$":
obj = self.get_instance(value.strip("$"))
if interface is not None and not isinstance(obj, interface):
raise ValueError(
f"class of {name}.{key} is required to implement interface {interface}."
)
configuration[key] = obj
self.register_instance(instance_key, configuration)
instance = configuration
return instance
def get_setter_name(self, property):
return "set" + property
def resolve_value(self, value):
if isinstance(value, str):
if value.lower() == "true":
value = True
elif value.lower() == "false":
value = False
try:
value = int(value)
except ValueError:
try:
value = self.get_instance(value)
except ValueError:
pass
if isinstance(value, list):
result = []
contains_instance = False
for val in value:
if (
isinstance(val, str)
and val.lower() != "true"
and val.lower() != "false"
):
result.append(self.get_instance(val))
contains_instance = True
else:
result.append(val)
if contains_instance:
value = result
return value
def get_interface(self, name):
if name in DefaultFactory.required_interfaces:
class_name = DefaultFactory.required_interfaces[name]
class_name_parts = class_name.split(".")
if len(class_name_parts) == 2:
instance_class = getattr(
importlib.import_module(".", class_name_parts[0]),
class_name_parts[1],
)
else:
instance_class = getattr(
importlib.import_module(".".join(class_name_parts[:-1])),
class_name_parts[-1],
)
return instance_class
return None
def register_instance(self, name: str, instance: object):
"""Register an instance with the factory
Args:
name (str): the name of the instance
instance (object): the instance to be registered
"""
with self._instances_lock:
instance_key = name.lower()
self.instances[instance_key] = instance
self._instances_condition.notify_all()
def _access_instance(self, key: str) -> Any:
"""Access an instance by name
Args:
key (str): the name of the instance
Returns:
Any: the instance if it exists, otherwise None
"""
if self._instances_lock.locked():
self._instances_condition.wait()
instance_key = key.lower()
if instance_key in self.instances:
return self.instances[instance_key]
return None
def get_instance_of(self, class_name, dynamic_configuration={}):
configuration = {
**{"__class": class_name, "__shared": False},
**dynamic_configuration,
}
# check if an instance of the class has already been created note this is only applicable
# for classes that have a defined section in a configuration file
instance = self._access_instance(class_name)
if instance is not None:
return instance
return self.create_instance(class_name, configuration, None)
def get_new_instance(self, name, dynamic_configuration={}):
configuration = {**dynamic_configuration, "__shared": False}
instance = self.get_instance(name, configuration)
return instance
def clear_instance(self, name: str):
with self._instances_lock:
instance_key = name.lower()
if instance_key in self.instances:
del self.instances[instance_key]
self._instances_condition.notify_all()
def import_module(self, module_name):
with self._module_lock:
module = self.modules.get(module_name)
if module is None:
module = importlib.import_module(module_name)
self.modules[module_name] = module
return module
def __getstate__(self) -> object:
tmp_dict = self.__dict__.copy()
if "modules" in tmp_dict:
del tmp_dict["modules"]
# store the necessary instances in the instance dictionary but delete all others
# this is to prevent serialization issues as some instances may reference an unserializable object
if "instances" in tmp_dict:
# the internal action mappers of all components are necessary to initialize the component
# as it is a dependency of the component which must be injected. In other words, without
# the action mapper, trying to initialize the component will result in an infinite loop
# with the component trying to initialize the action mapper and the action mapper trying to
# initialize the component.
for key in list(tmp_dict["instances"].keys()):
if not key.endswith("actionmapper"):
del tmp_dict["instances"][key]
tmp_dict["instances"]["configuration"] = self.configuration
tmp_dict["instances"]["factory"] = self
# delete the locks
if "stack_lock" in tmp_dict:
del tmp_dict["stack_lock"]
if "_instances_lock" in tmp_dict:
del tmp_dict["_instances_lock"]
if "_instances_condition" in tmp_dict:
del tmp_dict["_instances_condition"]
if "_module_lock" in tmp_dict:
del tmp_dict["_module_lock"]
return tmp_dict
def __setstate__(self, state: dict) -> None:
self.__dict__.update(state)
self.stack_lock = threading.Lock()
self._instances_lock = threading.Lock()
self._instances_condition = threading.Condition(self._instances_lock)
self._module_lock = threading.Lock()
self.modules = {}
def __str__(self) -> str:
return f"""DefaultFactory(
Configuration:
{self.configuration}
Instances:
{self.instances}
)"""