Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from ldclient.impl.http import _http_factory
from ldclient.impl.repeating_task import RepeatingTask
from ldclient.impl.util import (
_LD_ENVID_HEADER,
_LD_FD_FALLBACK_HEADER,
UnsuccessfulResponseException,
_Fail,
_headers,
Expand Down Expand Up @@ -117,6 +119,13 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
while self._stop.is_set() is False:
result = self._requester.fetch(ss.selector())
if isinstance(result, _Fail):
fallback = None
envid = None

if result.headers is not None:
fallback = result.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = result.headers.get(_LD_ENVID_HEADER)

if isinstance(result.exception, UnsuccessfulResponseException):
error_info = DataSourceErrorInfo(
kind=DataSourceErrorKind.ERROR_RESPONSE,
Expand All @@ -127,28 +136,28 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
),
)

fallback = result.exception.headers.get("X-LD-FD-Fallback") == 'true'
if fallback:
yield Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
break

status_code = result.exception.status
if is_http_error_recoverable(status_code):
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
environment_id=envid,
)
continue

# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.OFF,
error=error_info,
environment_id=envid,
)
break

Expand All @@ -159,19 +168,18 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
message=result.error,
)

# TODO(fdv2): Go has a designation here to handle JSON decoding separately.
# TODO(fdv2): Add support for environment ID
yield Update(
state=DataSourceState.INTERRUPTED,
error=error_info,
environment_id=envid,
)
else:
(change_set, headers) = result.value
yield Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=headers.get("X-LD-EnvID"),
revert_to_fdv1=headers.get('X-LD-FD-Fallback') == 'true'
environment_id=headers.get(_LD_ENVID_HEADER),
revert_to_fdv1=headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
)

if self._event.wait(self._poll_interval):
Expand Down Expand Up @@ -208,7 +216,7 @@ def _poll(self, ss: SelectorStore) -> BasisResult:

(change_set, headers) = result.value

env_id = headers.get("X-LD-EnvID")
env_id = headers.get(_LD_ENVID_HEADER)
if not isinstance(env_id, str):
env_id = None

Expand Down Expand Up @@ -273,14 +281,14 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
),
retries=1,
)
headers = response.headers

if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
f"HTTP error {response}", UnsuccessfulResponseException(response.status),
headers=headers,
)

headers = response.headers

if response.status == 304:
return _Success(value=(ChangeSetBuilder.no_changes(), headers))

Expand All @@ -304,6 +312,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:
return _Fail(
error=changeset_result.error,
exception=changeset_result.exception,
headers=headers, # type: ignore
)


Expand Down Expand Up @@ -438,7 +447,7 @@ def fetch(self, selector: Optional[Selector]) -> PollingResult:

if response.status >= 400:
return _Fail(
f"HTTP error {response}", UnsuccessfulResponseException(response.status, response.headers)
f"HTTP error {response}", UnsuccessfulResponseException(response.status)
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
)

headers = response.headers
Expand Down
83 changes: 60 additions & 23 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
from ld_eventsource.errors import HTTPStatusError

from ldclient.config import Config
from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update
from ldclient.impl.datasystem import (
DiagnosticAccumulator,
DiagnosticSource,
SelectorStore,
Synchronizer,
Update
)
from ldclient.impl.datasystem.protocolv2 import (
ChangeSetBuilder,
DeleteObject,
Expand All @@ -32,6 +38,8 @@
)
from ldclient.impl.http import HTTPFactory, _http_factory
from ldclient.impl.util import (
_LD_ENVID_HEADER,
_LD_FD_FALLBACK_HEADER,
http_error_message,
is_http_error_recoverable,
log
Expand All @@ -52,7 +60,6 @@

STREAMING_ENDPOINT = "/sdk/stream"


SseClientBuilder = Callable[[Config, SelectorStore], SSEClient]


Expand Down Expand Up @@ -98,7 +105,7 @@ def query_params() -> dict[str, str]:
)


class StreamingDataSource(Synchronizer):
class StreamingDataSource(Synchronizer, DiagnosticSource):
"""
StreamingSynchronizer is a specific type of Synchronizer that handles
streaming data sources.
Expand All @@ -112,6 +119,11 @@ def __init__(self, config: Config):
self._config = config
self._sse: Optional[SSEClient] = None
self._running = False
self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None
self._connection_attempt_start_time: Optional[float] = None

def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator):
self._diagnostic_accumulator = diagnostic_accumulator

@property
def name(self) -> str:
Expand All @@ -133,6 +145,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:

change_set_builder = ChangeSetBuilder()
self._running = True
self._connection_attempt_start_time = time()

for action in self._sse.all:
if isinstance(action, Fault):
Expand All @@ -142,37 +155,46 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
if action.error is None:
continue

(update, should_continue) = self._handle_error(action.error)
envid = action.headers.get(_LD_ENVID_HEADER) if action.headers is not None else None
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Fault object: Headers in the wrong place.

The code attempts to access action.headers on a Fault object, but Fault objects don't have a headers attribute. The headers are stored on the error object (action.error.headers), as evidenced by the fallback logic in _handle_error at line 354-355 which correctly accesses error.headers. This will cause an AttributeError when a Fault with an error containing headers is encountered.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. Fixed.


(update, should_continue) = self._handle_error(action.error, envid)
if update is not None:
yield update

if not should_continue:
break
continue

envid = None
if isinstance(action, Start) and action.headers is not None:
fallback = action.headers.get('X-LD-FD-Fallback') == 'true'
fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true'
envid = action.headers.get(_LD_ENVID_HEADER)

if fallback:
self._record_stream_init(True)
yield Update(
state=DataSourceState.OFF,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Start Action Leaves Environment State Stale

When a Start action arrives with headers=None, the envid variable isn't updated, causing it to retain a stale value from a previous Fault action. This means subsequent event updates could incorrectly use an environment_id that was only meant for the fault's error update. The condition if isinstance(action, Start) and action.headers is not None: should update envid regardless of whether headers exist, setting it to None when headers are absent to properly reflect the new connection state.

Fix in Cursor Fix in Web

)
break

if not isinstance(action, Event):
continue

try:
update = self._process_message(action, change_set_builder)
update = self._process_message(action, change_set_builder, envid)
Comment thread
cursor[bot] marked this conversation as resolved.
if update is not None:
self._record_stream_init(False)
self._connection_attempt_start_time = None
yield update
except json.decoder.JSONDecodeError as e:
log.info(
"Error while handling stream event; will restart stream: %s", e
)
self._sse.interrupt()

(update, should_continue) = self._handle_error(e)
(update, should_continue) = self._handle_error(e, envid)
if update is not None:
yield update
if not should_continue:
Expand All @@ -189,13 +211,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

# TODO(sdk-1408)
# if update is not None:
# self._record_stream_init(False)

self._sse.close()

def stop(self):
Expand All @@ -207,9 +225,15 @@ def stop(self):
if self._sse:
self._sse.close()

def _record_stream_init(self, failed: bool):
if self._diagnostic_accumulator and self._connection_attempt_start_time:
current_time = int(time() * 1000)
elapsed = current_time - int(self._connection_attempt_start_time * 1000)
self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed)

# pylint: disable=too-many-return-statements
def _process_message(
self, msg: Event, change_set_builder: ChangeSetBuilder
self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str]
) -> Optional[Update]:
"""
Processes a single message from the SSE stream and returns an Update
Expand All @@ -230,7 +254,7 @@ def _process_message(
change_set_builder.expect_changes()
return Update(
state=DataSourceState.VALID,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
return None

Expand Down Expand Up @@ -276,13 +300,13 @@ def _process_message(
return Update(
state=DataSourceState.VALID,
change_set=change_set,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

log.info("Unexpected event found in stream: %s", msg.event)
return None

def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]:
"""
This method handles errors that occur during the streaming process.

Expand All @@ -301,30 +325,41 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:

if isinstance(error, json.decoder.JSONDecodeError):
log.error("Unexpected error on stream connection: %s, will retry", error)
self._record_stream_init(True)
self._connection_attempt_start_time = time() + \
self._sse.next_retry_delay # type: ignore

update = Update(
state=DataSourceState.INTERRUPTED,
error=DataSourceErrorInfo(
DataSourceErrorKind.INVALID_DATA, 0, time(), str(error)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
return (update, True)

if isinstance(error, HTTPStatusError):
self._record_stream_init(True)
self._connection_attempt_start_time = time() + \
self._sse.next_retry_delay # type: ignore

error_info = DataSourceErrorInfo(
DataSourceErrorKind.ERROR_RESPONSE,
error.status,
time(),
str(error),
)

if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true':
if envid is None and error.headers is not None:
envid = error.headers.get(_LD_ENVID_HEADER)
Comment thread
cursor[bot] marked this conversation as resolved.

if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true':
update = Update(
state=DataSourceState.OFF,
error=error_info,
revert_to_fdv1=True
revert_to_fdv1=True,
environment_id=envid,
)
return (update, False)

Expand All @@ -340,10 +375,11 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
),
error=error_info,
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)

if not is_recoverable:
self._connection_attempt_start_time = None
log.error(http_error_message_result)
self.stop()
return (update, False)
Expand All @@ -352,14 +388,16 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
return (update, True)

log.warning("Unexpected error on stream connection: %s, will retry", error)
self._record_stream_init(True)
self._connection_attempt_start_time = time() + self._sse.next_retry_delay # type: ignore

update = Update(
state=DataSourceState.INTERRUPTED,
error=DataSourceErrorInfo(
DataSourceErrorKind.UNKNOWN, 0, time(), str(error)
),
revert_to_fdv1=False,
environment_id=None, # TODO(sdk-1410)
environment_id=envid,
)
# no stacktrace here because, for a typical connection error, it'll
# just be a lengthy tour of urllib3 internals
Expand All @@ -384,5 +422,4 @@ def __init__(self, config: Config):

def build(self) -> StreamingDataSource:
"""Builds a StreamingDataSource instance with the configured parameters."""
# TODO(fdv2): Add in the other controls here.
return StreamingDataSource(self._config)
Loading