Skip to content

Commit 7f885f9

Browse files
committed
Use threading.Event
Signed-off-by: christian.lutnik <christian.lutnik@dynatrace.com>
1 parent c66ecf2 commit 7f885f9

2 files changed

Lines changed: 14 additions & 16 deletions

File tree

  • providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import json
22
import logging
33
import threading
4-
import time
54
import typing
65

76
import grpc
@@ -66,6 +65,7 @@ def __init__(
6665
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6766
self.deadline = config.deadline_ms * 0.001
6867
self.connected = False
68+
self.connected_event = threading.Event()
6969
self.channel = self._generate_channel(config)
7070
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
7171

@@ -161,12 +161,10 @@ def connect(self) -> None:
161161
)
162162
self.monitor_thread.start()
163163
## block until ready or deadline reached
164-
timeout = self.deadline + time.monotonic()
165-
while not self.connected and time.monotonic() < timeout:
166-
time.sleep(0.05)
167-
logger.debug("Finished blocking gRPC state initialization")
168164

169-
if not self.connected:
165+
if self.connected_event.wait(timeout=self.deadline) and self.connected:
166+
logger.debug("Finished blocking gRPC state initialization")
167+
else:
170168
raise ProviderNotReadyError(
171169
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
172170
)
@@ -204,6 +202,7 @@ def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
204202

205203
logger.debug("gRPC error timer started")
206204
self.timer.start()
205+
self.connected_event.clear()
207206
self.connected = False
208207

209208
def emit_error(self) -> None:
@@ -236,6 +235,7 @@ def listen(self) -> None:
236235
)
237236
)
238237
self.connected = True
238+
self.connected_event.set()
239239
elif message.type == "configuration_change":
240240
data = MessageToDict(message)["data"]
241241
self.handle_changed_flags(data)

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/connector/grpc_watcher.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import json
22
import logging
33
import threading
4-
import time
54
import typing
65

76
import grpc
@@ -50,6 +49,7 @@ def __init__(
5049
self.emit_provider_stale = emit_provider_stale
5150

5251
self.connected = False
52+
self.connected_event = threading.Event()
5353
self.thread: typing.Optional[threading.Thread] = None
5454
self.timer: typing.Optional[threading.Timer] = None
5555

@@ -142,15 +142,11 @@ def connect(self) -> None:
142142
)
143143
self.monitor_thread.start()
144144
## block until ready or deadline reached
145-
timeout = self.deadline + time.monotonic()
146-
while not self.connected and time.monotonic() < timeout:
147-
time.sleep(0.05)
148-
logger.debug("Finished blocking gRPC state initialization")
149-
150-
if not self.connected:
151-
raise ProviderNotReadyError(
152-
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
153-
)
145+
if self.connected_event.wait(timeout=self.deadline) and self.connected:
146+
logger.debug("Finished blocking gRPC state initialization")
147+
raise ProviderNotReadyError(
148+
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
149+
)
154150

155151
def monitor(self) -> None:
156152
self.channel.subscribe(self._state_change_callback, try_to_connect=True)
@@ -186,6 +182,7 @@ def _state_change_callback(self, new_state: grpc.ChannelConnectivity) -> None:
186182
logger.debug("gRPC error timer started")
187183
self.timer.start()
188184
self.connected = False
185+
self.connected_event.clear()
189186

190187
def emit_error(self) -> None:
191188
logger.debug("gRPC error emitted")
@@ -275,6 +272,7 @@ def listen(self) -> None:
275272
context_values,
276273
)
277274
self.connected = True
275+
self.connected_event.set()
278276

279277
if not self.active:
280278
logger.debug("Terminating gRPC sync thread")

0 commit comments

Comments
 (0)