-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathstatus.py
More file actions
109 lines (81 loc) · 3.31 KB
/
status.py
File metadata and controls
109 lines (81 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import time
from copy import copy
from typing import Callable, Optional
from ldclient.impl.datasystem.store import Store
from ldclient.impl.listeners import Listeners
from ldclient.impl.rwlock import ReadWriteLock
from ldclient.interfaces import (
DataSourceErrorInfo,
DataSourceState,
DataSourceStatus,
DataSourceStatusProvider,
DataStoreStatus,
DataStoreStatusProvider,
FeatureStore
)
class DataSourceStatusProviderImpl(DataSourceStatusProvider):
def __init__(self, listeners: Listeners):
self.__listeners = listeners
self.__status = DataSourceStatus(DataSourceState.INITIALIZING, 0, None)
self.__lock = ReadWriteLock()
@property
def status(self) -> DataSourceStatus:
self.__lock.rlock()
status = self.__status
self.__lock.runlock()
return status
def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]):
status_to_broadcast = None
try:
self.__lock.lock()
old_status = self.__status
if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING:
new_state = DataSourceState.INITIALIZING
if new_state == old_status.state and new_error is None:
return
new_since = self.__status.since if new_state == self.__status.state else time.time()
new_error = self.__status.error if new_error is None else new_error
self.__status = DataSourceStatus(new_state, new_since, new_error)
status_to_broadcast = self.__status
finally:
self.__lock.unlock()
if status_to_broadcast is not None:
self.__listeners.notify(status_to_broadcast)
def add_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.add(listener)
def remove_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.remove(listener)
class DataStoreStatusProviderImpl(DataStoreStatusProvider):
def __init__(self, store: Optional[FeatureStore], listeners: Listeners):
self.__store = store
self.__listeners = listeners
self.__lock = ReadWriteLock()
self.__status = DataStoreStatus(True, False)
def update_status(self, status: DataStoreStatus):
"""
update_status is called from the data store to push a status update.
"""
self.__lock.lock()
modified = False
if self.__status != status:
self.__status = status
modified = True
self.__lock.unlock()
if modified:
self.__listeners.notify(status)
@property
def status(self) -> DataStoreStatus:
self.__lock.rlock()
status = copy(self.__status)
self.__lock.runlock()
return status
def is_monitoring_enabled(self) -> bool:
if self.__store is None:
return False
if hasattr(self.__store, "is_monitoring_enabled") is False:
return False
return self.__store.is_monitoring_enabled() # type: ignore
def add_listener(self, listener: Callable[[DataStoreStatus], None]):
self.__listeners.add(listener)
def remove_listener(self, listener: Callable[[DataStoreStatus], None]):
self.__listeners.remove(listener)