This repository was archived by the owner on Mar 23, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 774
Expand file tree
/
Copy pathcomponent_registry.py
More file actions
106 lines (87 loc) · 3.58 KB
/
component_registry.py
File metadata and controls
106 lines (87 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import os
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union
from taskweaver.utils import glob_files
component_type = TypeVar("component_type")
class ComponentDisabledException(Exception):
pass
class ComponentRegistry(ABC, Generic[component_type]):
def __init__(self, file_glob: Union[str, List[str]], ttl: Optional[timedelta] = None) -> None:
super().__init__()
self._registry: Optional[Dict[str, component_type]] = None
self._registry_update: datetime = datetime.fromtimestamp(0)
self._file_glob: Union[str, List[str]] = file_glob
self._ttl: Optional[timedelta] = ttl
@abstractmethod
def _load_component(self, path: str) -> Tuple[str, component_type]:
raise NotImplementedError
def is_available(self, freshness: Optional[timedelta] = None) -> bool:
if self._registry is None:
return False
staleness = datetime.now() - self._registry_update
if self._ttl is not None and staleness > self._ttl:
return False
if freshness is not None and staleness > freshness:
return False
return True
def _should_skip_plugin_file(self, path):
splitted_path = path.split(os.sep)
is_plugin = os.path.join('plugins','project') in path
is_plugin_yaml = splitted_path[-2] == splitted_path[-1].split('.')[0]
if is_plugin:
if not is_plugin_yaml:
return True
return False
def get_registry(
self,
force_reload: bool = False,
freshness: Optional[timedelta] = None,
show_error: bool = False,
) -> Dict[str, component_type]:
if not force_reload and self.is_available(freshness):
assert self._registry is not None
return self._registry
registry: Dict[str, component_type] = {}
for path in glob_files(self._file_glob):
if self._should_skip_plugin_file(path):
continue
try:
name, component = self._load_component(path)
except ComponentDisabledException:
continue
except Exception as e:
if show_error:
print(f"failed to loading component from {path}, skipping: {e}")
continue
if component is None:
if show_error:
print(f"failed to loading component from {path}, skipping")
continue
registry[name] = component
self._registry_update = datetime.now()
self._registry = registry
return registry
@property
def registry(self) -> Dict[str, component_type]:
return self.get_registry()
def get_list(self, force_reload: bool = False, freshness: Optional[timedelta] = None) -> List[component_type]:
registry = self.get_registry(force_reload, freshness, show_error=True)
keys = sorted(registry.keys())
return [registry[k] for k in keys]
@property
def list(self) -> List[component_type]:
return self.get_list()
def get(self, name: str) -> Optional[component_type]:
return self.registry.get(name, None)
def __getitem__(self, name: str) -> Optional[component_type]:
return self.get(name)
@property
def file_glob(self) -> str:
return self._file_glob
@file_glob.setter
def file_glob(self, file_glob: str) -> None:
if self._file_glob == file_glob:
return
self._file_glob = file_glob
self._registry = None