-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathfdv1.py
More file actions
181 lines (150 loc) · 6.63 KB
/
fdv1.py
File metadata and controls
181 lines (150 loc) · 6.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from threading import Event
from typing import Optional
from ldclient.config import Config
from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl
from ldclient.impl.datasource.polling import PollingUpdateProcessor
from ldclient.impl.datasource.status import (
DataSourceStatusProviderImpl,
DataSourceUpdateSinkImpl
)
from ldclient.impl.datasource.streaming import StreamingUpdateProcessor
from ldclient.impl.datastore.status import (
DataStoreStatusProviderImpl,
DataStoreUpdateSinkImpl
)
from ldclient.impl.datasystem import DataAvailability, DiagnosticAccumulator
from ldclient.impl.flag_tracker import FlagTrackerImpl
from ldclient.impl.listeners import Listeners
from ldclient.impl.stubs import NullUpdateProcessor
from ldclient.interfaces import (
DataSourceState,
DataSourceStatus,
DataSourceStatusProvider,
DataStoreStatusProvider,
FeatureStore,
FlagTracker,
ReadOnlyStore,
UpdateProcessor
)
# Delayed import inside __init__ to avoid circular dependency with ldclient.client
class FDv1:
"""
FDv1 wires the existing v1 data source and store behavior behind the
generic DataSystem surface.
"""
def __init__(self, config: Config):
self._config = config
# Set up data store plumbing
self._data_store_listeners = Listeners()
self._data_store_update_sink = DataStoreUpdateSinkImpl(
self._data_store_listeners
)
# Import here to avoid circular import
from ldclient.client import _FeatureStoreClientWrapper
self._store_wrapper: FeatureStore = _FeatureStoreClientWrapper(
self._config.feature_store, self._data_store_update_sink
)
self._data_store_status_provider_impl = DataStoreStatusProviderImpl(
self._store_wrapper, self._data_store_update_sink
)
# Set up data source plumbing
self._data_source_listeners = Listeners()
self._flag_change_listeners = Listeners()
self._flag_tracker_impl = FlagTrackerImpl(
self._flag_change_listeners,
lambda key, context: None, # Replaced by client to use its evaluation method
)
self._data_source_update_sink = DataSourceUpdateSinkImpl(
self._store_wrapper,
self._data_source_listeners,
self._flag_change_listeners,
)
self._data_source_status_provider_impl = DataSourceStatusProviderImpl(
self._data_source_listeners, self._data_source_update_sink
)
# Ensure v1 processors can find the sink via config for status updates
self._config._data_source_update_sink = self._data_source_update_sink
# Update processor created in start(), because it needs the ready Event
self._update_processor: Optional[UpdateProcessor] = None
# Diagnostic accumulator provided by client for streaming metrics
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
# Track current data availability
self._data_availability: DataAvailability = (
DataAvailability.CACHED
if getattr(self._store_wrapper, "initialized", False)
else DataAvailability.DEFAULTS
)
# React to data source status updates to adjust availability
def _on_status_change(status: DataSourceStatus):
if status.state == DataSourceState.VALID:
self._data_availability = DataAvailability.REFRESHED
self._data_source_status_provider_impl.add_listener(_on_status_change)
def start(self, set_on_ready: Event):
"""
Starts the v1 update processor and returns immediately. The provided
Event is set by the processor upon first successful initialization or
upon permanent failure.
"""
update_processor = self._make_update_processor(
self._config, self._store_wrapper, set_on_ready
)
self._update_processor = update_processor
update_processor.start()
def stop(self):
if self._update_processor is not None:
self._update_processor.stop()
@property
def store(self) -> ReadOnlyStore:
return self._store_wrapper
def set_flag_value_eval_fn(self, eval_fn):
"""
Injects the flag value evaluation function used by the flag tracker to
compute FlagValueChange events. The function signature should be
(key: str, context: Context) -> Any.
"""
self._flag_tracker_impl = FlagTrackerImpl(self._flag_change_listeners, eval_fn)
def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
"""
Sets the diagnostic accumulator for streaming initialization metrics.
This should be called before start() to ensure metrics are collected.
"""
self._diagnostic_accumulator = diagnostic_accumulator
@property
def data_source_status_provider(self) -> DataSourceStatusProvider:
return self._data_source_status_provider_impl
@property
def data_store_status_provider(self) -> DataStoreStatusProvider:
return self._data_store_status_provider_impl
@property
def flag_tracker(self) -> FlagTracker:
return self._flag_tracker_impl
@property
def data_availability(self) -> DataAvailability:
if self._config.offline:
return DataAvailability.DEFAULTS
if self._update_processor is not None and self._update_processor.initialized():
return DataAvailability.REFRESHED
if self._store_wrapper.initialized:
return DataAvailability.CACHED
return DataAvailability.DEFAULTS
@property
def target_availability(self) -> DataAvailability:
if self._config.offline:
return DataAvailability.DEFAULTS
# In LDD mode or normal connected modes, the ideal is to be refreshed
return DataAvailability.REFRESHED
def _make_update_processor(self, config: Config, store: FeatureStore, ready: Event):
# Mirrors LDClient._make_update_processor but scoped for FDv1
if config.update_processor_class:
return config.update_processor_class(config, store, ready)
if config.offline or config.use_ldd:
return NullUpdateProcessor(config, store, ready)
if config.stream:
return StreamingUpdateProcessor(config, store, ready, self._diagnostic_accumulator)
# Polling mode
feature_requester = (
config.feature_requester_class(config)
if config.feature_requester_class is not None
else FeatureRequesterImpl(config)
)
return PollingUpdateProcessor(config, feature_requester, store, ready)