-
Notifications
You must be signed in to change notification settings - Fork 47
chore: Support x-ld-envid in updates #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
383a396
203a033
cc6e742
d3cb296
254009b
7e6977d
38331d6
0fb7c36
a269e0f
94c2334
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,13 @@ | |
| from ld_eventsource.errors import HTTPStatusError | ||
|
|
||
| from ldclient.config import Config | ||
| from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update | ||
| from ldclient.impl.datasystem import ( | ||
| DiagnosticAccumulator, | ||
| DiagnosticSource, | ||
| SelectorStore, | ||
| Synchronizer, | ||
| Update | ||
| ) | ||
| from ldclient.impl.datasystem.protocolv2 import ( | ||
| ChangeSetBuilder, | ||
| DeleteObject, | ||
|
|
@@ -32,6 +38,8 @@ | |
| ) | ||
| from ldclient.impl.http import HTTPFactory, _http_factory | ||
| from ldclient.impl.util import ( | ||
| _LD_ENVID_HEADER, | ||
| _LD_FD_FALLBACK_HEADER, | ||
| http_error_message, | ||
| is_http_error_recoverable, | ||
| log | ||
|
|
@@ -52,7 +60,6 @@ | |
|
|
||
| STREAMING_ENDPOINT = "/sdk/stream" | ||
|
|
||
|
|
||
| SseClientBuilder = Callable[[Config, SelectorStore], SSEClient] | ||
|
|
||
|
|
||
|
|
@@ -98,7 +105,7 @@ def query_params() -> dict[str, str]: | |
| ) | ||
|
|
||
|
|
||
| class StreamingDataSource(Synchronizer): | ||
| class StreamingDataSource(Synchronizer, DiagnosticSource): | ||
| """ | ||
| StreamingSynchronizer is a specific type of Synchronizer that handles | ||
| streaming data sources. | ||
|
|
@@ -112,6 +119,11 @@ def __init__(self, config: Config): | |
| self._config = config | ||
| self._sse: Optional[SSEClient] = None | ||
| self._running = False | ||
| self._diagnostic_accumulator: Optional[DiagnosticAccumulator] = None | ||
| self._connection_attempt_start_time: Optional[float] = None | ||
|
|
||
| def set_diagnostic_accumulator(self, diagnostic_accumulator: DiagnosticAccumulator): | ||
| self._diagnostic_accumulator = diagnostic_accumulator | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
|
|
@@ -133,6 +145,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
|
|
||
| change_set_builder = ChangeSetBuilder() | ||
| self._running = True | ||
| self._connection_attempt_start_time = time() | ||
|
|
||
| for action in self._sse.all: | ||
| if isinstance(action, Fault): | ||
|
|
@@ -142,37 +155,46 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| if action.error is None: | ||
| continue | ||
|
|
||
| (update, should_continue) = self._handle_error(action.error) | ||
| envid = action.headers.get(_LD_ENVID_HEADER) if action.headers is not None else None | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Fault object: Headers in the wrong place.The code attempts to access
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point. Fixed. |
||
|
|
||
| (update, should_continue) = self._handle_error(action.error, envid) | ||
| if update is not None: | ||
| yield update | ||
|
|
||
| if not should_continue: | ||
| break | ||
| continue | ||
|
|
||
| envid = None | ||
| if isinstance(action, Start) and action.headers is not None: | ||
| fallback = action.headers.get('X-LD-FD-Fallback') == 'true' | ||
| fallback = action.headers.get(_LD_FD_FALLBACK_HEADER) == 'true' | ||
| envid = action.headers.get(_LD_ENVID_HEADER) | ||
|
|
||
| if fallback: | ||
| self._record_stream_init(True) | ||
| yield Update( | ||
| state=DataSourceState.OFF, | ||
| revert_to_fdv1=True | ||
| revert_to_fdv1=True, | ||
| environment_id=envid, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Start Action Leaves Environment State StaleWhen a |
||
| ) | ||
| break | ||
|
|
||
| if not isinstance(action, Event): | ||
| continue | ||
|
|
||
| try: | ||
| update = self._process_message(action, change_set_builder) | ||
| update = self._process_message(action, change_set_builder, envid) | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| if update is not None: | ||
| self._record_stream_init(False) | ||
| self._connection_attempt_start_time = None | ||
| yield update | ||
| except json.decoder.JSONDecodeError as e: | ||
| log.info( | ||
| "Error while handling stream event; will restart stream: %s", e | ||
| ) | ||
| self._sse.interrupt() | ||
|
|
||
| (update, should_continue) = self._handle_error(e) | ||
| (update, should_continue) = self._handle_error(e, envid) | ||
| if update is not None: | ||
| yield update | ||
| if not should_continue: | ||
|
|
@@ -189,13 +211,9 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]: | |
| DataSourceErrorKind.UNKNOWN, 0, time(), str(e) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| # TODO(sdk-1408) | ||
| # if update is not None: | ||
| # self._record_stream_init(False) | ||
|
|
||
| self._sse.close() | ||
|
|
||
| def stop(self): | ||
|
|
@@ -207,9 +225,15 @@ def stop(self): | |
| if self._sse: | ||
| self._sse.close() | ||
|
|
||
| def _record_stream_init(self, failed: bool): | ||
| if self._diagnostic_accumulator and self._connection_attempt_start_time: | ||
| current_time = int(time() * 1000) | ||
| elapsed = current_time - int(self._connection_attempt_start_time * 1000) | ||
| self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed) | ||
|
|
||
| # pylint: disable=too-many-return-statements | ||
| def _process_message( | ||
| self, msg: Event, change_set_builder: ChangeSetBuilder | ||
| self, msg: Event, change_set_builder: ChangeSetBuilder, envid: Optional[str] | ||
| ) -> Optional[Update]: | ||
| """ | ||
| Processes a single message from the SSE stream and returns an Update | ||
|
|
@@ -230,7 +254,7 @@ def _process_message( | |
| change_set_builder.expect_changes() | ||
| return Update( | ||
| state=DataSourceState.VALID, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| return None | ||
|
|
||
|
|
@@ -276,13 +300,13 @@ def _process_message( | |
| return Update( | ||
| state=DataSourceState.VALID, | ||
| change_set=change_set, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| log.info("Unexpected event found in stream: %s", msg.event) | ||
| return None | ||
|
|
||
| def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | ||
| def _handle_error(self, error: Exception, envid: Optional[str]) -> Tuple[Optional[Update], bool]: | ||
| """ | ||
| This method handles errors that occur during the streaming process. | ||
|
|
||
|
|
@@ -301,30 +325,41 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
|
|
||
| if isinstance(error, json.decoder.JSONDecodeError): | ||
| log.error("Unexpected error on stream connection: %s, will retry", error) | ||
| self._record_stream_init(True) | ||
| self._connection_attempt_start_time = time() + \ | ||
| self._sse.next_retry_delay # type: ignore | ||
|
|
||
| update = Update( | ||
| state=DataSourceState.INTERRUPTED, | ||
| error=DataSourceErrorInfo( | ||
| DataSourceErrorKind.INVALID_DATA, 0, time(), str(error) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| return (update, True) | ||
|
|
||
| if isinstance(error, HTTPStatusError): | ||
| self._record_stream_init(True) | ||
| self._connection_attempt_start_time = time() + \ | ||
| self._sse.next_retry_delay # type: ignore | ||
|
|
||
| error_info = DataSourceErrorInfo( | ||
| DataSourceErrorKind.ERROR_RESPONSE, | ||
| error.status, | ||
| time(), | ||
| str(error), | ||
| ) | ||
|
|
||
| if error.headers is not None and error.headers.get("X-LD-FD-Fallback") == 'true': | ||
| if envid is None and error.headers is not None: | ||
| envid = error.headers.get(_LD_ENVID_HEADER) | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
| if error.headers is not None and error.headers.get(_LD_FD_FALLBACK_HEADER) == 'true': | ||
| update = Update( | ||
| state=DataSourceState.OFF, | ||
| error=error_info, | ||
| revert_to_fdv1=True | ||
| revert_to_fdv1=True, | ||
| environment_id=envid, | ||
| ) | ||
| return (update, False) | ||
|
|
||
|
|
@@ -340,10 +375,11 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| ), | ||
| error=error_info, | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
|
|
||
| if not is_recoverable: | ||
| self._connection_attempt_start_time = None | ||
| log.error(http_error_message_result) | ||
| self.stop() | ||
| return (update, False) | ||
|
|
@@ -352,14 +388,16 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]: | |
| return (update, True) | ||
|
|
||
| log.warning("Unexpected error on stream connection: %s, will retry", error) | ||
| self._record_stream_init(True) | ||
| self._connection_attempt_start_time = time() + self._sse.next_retry_delay # type: ignore | ||
|
|
||
| update = Update( | ||
| state=DataSourceState.INTERRUPTED, | ||
| error=DataSourceErrorInfo( | ||
| DataSourceErrorKind.UNKNOWN, 0, time(), str(error) | ||
| ), | ||
| revert_to_fdv1=False, | ||
| environment_id=None, # TODO(sdk-1410) | ||
| environment_id=envid, | ||
| ) | ||
| # no stacktrace here because, for a typical connection error, it'll | ||
| # just be a lengthy tour of urllib3 internals | ||
|
|
@@ -384,5 +422,4 @@ def __init__(self, config: Config): | |
|
|
||
| def build(self) -> StreamingDataSource: | ||
| """Builds a StreamingDataSource instance with the configured parameters.""" | ||
| # TODO(fdv2): Add in the other controls here. | ||
| return StreamingDataSource(self._config) | ||
Uh oh!
There was an error while loading. Please reload this page.