-
Notifications
You must be signed in to change notification settings - Fork 47
Expand file tree
/
Copy pathredis_feature_store.py
More file actions
123 lines (101 loc) · 4.44 KB
/
redis_feature_store.py
File metadata and controls
123 lines (101 loc) · 4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import json
from typing import Any, Dict
from ldclient import log
from ldclient.feature_store_helpers import CachingStoreWrapper
from ldclient.impl.util import redact_password
from ldclient.interfaces import DiagnosticDescription, FeatureStoreCore
from ldclient.versioned_data_kind import FEATURES
have_redis = False
try:
import redis
have_redis = True
except ImportError:
pass
class _RedisFeatureStoreCore(DiagnosticDescription, FeatureStoreCore):
def __init__(self, url, prefix, redis_opts: Dict[str, Any]):
if not have_redis:
raise NotImplementedError("Cannot use Redis feature store because redis package is not installed")
self._prefix = prefix or 'launchdarkly'
self._init_key = "{0}:{1}".format(self._prefix, CachingStoreWrapper.__INITED_CACHE_KEY__)
self._pool = redis.ConnectionPool.from_url(url=url, **redis_opts)
self.test_update_hook = None # exposed for testing
log.info("Started RedisFeatureStore connected to URL: " + redact_password(url) + " using prefix: " + self._prefix)
def is_available(self) -> bool:
try:
self.initialized_internal()
return True
except BaseException:
return False
def _items_key(self, kind):
return "{0}:{1}".format(self._prefix, kind.namespace)
def init_internal(self, all_data):
pipe = redis.Redis(connection_pool=self._pool).pipeline()
all_count = 0
for kind, items in all_data.items():
base_key = self._items_key(kind)
pipe.delete(base_key)
for key, item in items.items():
item_json = json.dumps(item)
pipe.hset(base_key, key, item_json)
all_count = all_count + len(items)
pipe.set(self._init_key, self._init_key)
pipe.execute()
log.info("Initialized RedisFeatureStore with %d items", all_count)
def get_all_internal(self, kind):
r = redis.Redis(connection_pool=self._pool)
all_items = r.hgetall(self._items_key(kind))
if all_items is None or all_items == "":
all_items = {}
results = {}
for key, item_json in all_items.items():
key = key.decode('utf-8') # necessary in Python 3
results[key] = json.loads(item_json.decode('utf-8'))
return results
def get_internal(self, kind, key):
r = redis.Redis(connection_pool=self._pool)
item_json = r.hget(self._items_key(kind), key)
if item_json is None or item_json == "":
log.debug("RedisFeatureStore: key %s not found in '%s'. Returning None.", key, kind.namespace)
return None
return json.loads(item_json.decode('utf-8'))
def upsert_internal(self, kind, item):
r = redis.Redis(connection_pool=self._pool)
base_key = self._items_key(kind)
key = item['key']
item_json = json.dumps(item)
while True:
pipeline = r.pipeline()
pipeline.watch(base_key)
old = self.get_internal(kind, key)
if self.test_update_hook is not None:
self.test_update_hook(base_key, key)
if old and old['version'] >= item['version']:
log.debug(
'RedisFeatureStore: Attempted to %s key: %s version %d with a version that is the same or older: %d in "%s"',
'delete' if item.get('deleted') else 'update',
key,
old['version'],
item['version'],
kind.namespace,
)
pipeline.unwatch()
return old
else:
pipeline.multi()
pipeline.hset(base_key, key, item_json)
try:
pipeline.execute()
# Unlike Redis implementations for other platforms, in redis-py a failed WATCH
# produces an exception rather than a null result from execute().
except redis.exceptions.WatchError:
log.debug("RedisFeatureStore: concurrent modification detected, retrying")
continue
return item
def initialized_internal(self):
r = redis.Redis(connection_pool=self._pool)
return r.exists(self._init_key)
def describe_configuration(self, config):
return 'Redis'
def _before_update_transaction(self, base_key, key):
# exposed for testing
pass