Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ jobs:
- name: run SSE contract tests
run: make run-contract-tests

- name: start async SSE contract test service
run: make start-async-contract-test-service-bg

- name: run async SSE contract tests
run: make run-async-contract-tests

windows:
runs-on: windows-latest

Expand Down
11 changes: 11 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ poetry install
eval $(poetry env activate)
```

To also install the optional async dependencies (required to use `AsyncSSEClient`):

```
poetry install --extras async
```

### Testing

To run all unit tests:
Expand All @@ -36,6 +42,11 @@ To run the standardized contract tests that are run against all LaunchDarkly SSE
make contract-tests
```

To run the same contract tests against the async implementation:
```
make async-contract-tests
```

### Linting

To run the linter and check type hints:
Expand Down
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
PYTEST_FLAGS=-W error::SyntaxWarning

TEMP_TEST_OUTPUT=/tmp/sse-contract-test-service.log
TEMP_ASYNC_TEST_OUTPUT=/tmp/sse-async-contract-test-service.log

SPHINXOPTS = -W --keep-going
SPHINXBUILD = sphinx-build
Expand Down Expand Up @@ -70,3 +71,21 @@ run-contract-tests:
.PHONY: contract-tests
contract-tests: #! Run the SSE contract test harness
contract-tests: install-contract-tests-deps start-contract-test-service-bg run-contract-tests

.PHONY: start-async-contract-test-service
start-async-contract-test-service:
@cd contract-tests && poetry run python async_service.py 8001

.PHONY: start-async-contract-test-service-bg
start-async-contract-test-service-bg:
@echo "Async test service output will be captured in $(TEMP_ASYNC_TEST_OUTPUT)"
@make start-async-contract-test-service >$(TEMP_ASYNC_TEST_OUTPUT) 2>&1 &

.PHONY: run-async-contract-tests
run-async-contract-tests:
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \
| VERSION=v2 PARAMS="-url http://localhost:8001 -debug -stop-service-at-end" sh

.PHONY: async-contract-tests
async-contract-tests: #! Run the SSE async contract test harness
async-contract-tests: install-contract-tests-deps start-async-contract-test-service-bg run-async-contract-tests
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ This package's primary purpose is to support the [LaunchDarkly SDK for Python](h
* Setting read timeouts, custom headers, and other HTTP request properties.
* Specifying that connections should be retried under circumstances where the standard EventSource behavior would not retry them, such as if the server returns an HTTP error status.

This is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.
The default `SSEClient` is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.

An async implementation, `AsyncSSEClient`, is also available for use with `asyncio`-based applications. It uses `aiohttp` for HTTP and requires installing the optional `async` extra:

```
pip install launchdarkly-eventsource[async]
```

## Supported Python versions

Expand Down
119 changes: 119 additions & 0 deletions contract-tests/async_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import json
import logging
import os
import sys
from logging.config import dictConfig

import aiohttp.web
from async_stream_entity import AsyncStreamEntity

default_port = 8000

dictConfig({
'version': 1,
'formatters': {
'default': {
'format': '[%(asctime)s] [%(name)s] %(levelname)s: %(message)s',
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'default'
}
},
'root': {
'level': 'INFO',
'handlers': ['console']
},
})

global_log = logging.getLogger('testservice')

stream_counter = 0
streams = {}


async def handle_get_status(request):
body = {
'capabilities': [
'comments',
'headers',
'last-event-id',
'read-timeout',
]
}
return aiohttp.web.Response(
body=json.dumps(body),
content_type='application/json',
)


async def handle_delete_stop(request):
global_log.info("Test service has told us to exit")
os._exit(0)


async def handle_post_create_stream(request):
global stream_counter, streams

options = json.loads(await request.read())

stream_counter += 1
stream_id = str(stream_counter)
resource_url = '/streams/%s' % stream_id

stream = AsyncStreamEntity(options, request.app['http_session'])
streams[stream_id] = stream

return aiohttp.web.Response(status=201, headers={'Location': resource_url})


async def handle_post_stream_command(request):
stream_id = request.match_info['id']
params = json.loads(await request.read())

stream = streams.get(stream_id)
if stream is None:
return aiohttp.web.Response(status=404)
if not await stream.do_command(params.get('command')):
return aiohttp.web.Response(status=400)
return aiohttp.web.Response(status=204)


async def handle_delete_stream(request):
stream_id = request.match_info['id']

stream = streams.get(stream_id)
if stream is None:
return aiohttp.web.Response(status=404)
await stream.close()
return aiohttp.web.Response(status=204)


async def on_startup(app):
app['http_session'] = aiohttp.ClientSession()


async def on_cleanup(app):
await app['http_session'].close()


def make_app():
app = aiohttp.web.Application()
app.router.add_get('/', handle_get_status)
app.router.add_delete('/', handle_delete_stop)
app.router.add_post('/', handle_post_create_stream)
app.router.add_post('/streams/{id}', handle_post_stream_command)
app.router.add_delete('/streams/{id}', handle_delete_stream)
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)
return app


if __name__ == "__main__":
port = default_port
if sys.argv[len(sys.argv) - 1] != 'async_service.py':
port = int(sys.argv[len(sys.argv) - 1])
global_log.info('Listening on port %d', port)
aiohttp.web.run_app(make_app(), host='0.0.0.0', port=port)
116 changes: 116 additions & 0 deletions contract-tests/async_stream_entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import asyncio
import json
import logging
import os
import sys
import traceback

import aiohttp

# Import ld_eventsource from parent directory
sys.path.insert(1, os.path.join(sys.path[0], '..'))
from ld_eventsource.actions import Comment, Event, Fault # noqa: E402
from ld_eventsource.async_client import AsyncSSEClient # noqa: E402
from ld_eventsource.config.async_connect_strategy import \
AsyncConnectStrategy # noqa: E402
from ld_eventsource.config.error_strategy import ErrorStrategy # noqa: E402


def millis_to_seconds(t):
return None if t is None else t / 1000


class AsyncStreamEntity:
def __init__(self, options, http_session: aiohttp.ClientSession):
self.options = options
self.callback_url = options["callbackUrl"]
self.log = logging.getLogger(options["tag"])
self.closed = False
self.callback_counter = 0
self.sse = None
self._http_session = http_session
asyncio.create_task(self.run())

async def run(self):
stream_url = self.options["streamUrl"]
try:
self.log.info('Opening stream from %s', stream_url)

request_options = {}
if self.options.get("readTimeoutMs") is not None:
request_options["timeout"] = aiohttp.ClientTimeout(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, even without a read timeout, we need to customize the ClientTimeout.
image

The timeout uses a non-default ClientTimeout.

But if we do set it to anything, then the total will get set to None. https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.ClientTimeout.total

But if we set a ClientTimeout, we would get the default total=None.

sock_read=millis_to_seconds(self.options.get("readTimeoutMs"))
)

connect = AsyncConnectStrategy.http(
url=stream_url,
headers=self.options.get("headers"),
aiohttp_request_options=request_options if request_options else None,
)
sse = AsyncSSEClient(
connect,
initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")),
last_event_id=self.options.get("lastEventId"),
error_strategy=ErrorStrategy.from_lambda(
lambda _: (
ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE,
None,
)
),
logger=self.log,
)
self.sse = sse
async for item in sse.all:
if isinstance(item, Event):
self.log.info('Received event from stream (%s)', item.event)
await self.send_message(
{
'kind': 'event',
'event': {
'type': item.event,
'data': item.data,
'id': item.last_event_id,
},
}
)
elif isinstance(item, Comment):
self.log.info('Received comment from stream: %s', item.comment)
await self.send_message({'kind': 'comment', 'comment': item.comment})
elif isinstance(item, Fault):
if self.closed:
break
if item.error:
self.log.info('Received error from stream: %s', item.error)
await self.send_message({'kind': 'error', 'error': str(item.error)})
except Exception as e:
self.log.info('Received error from stream: %s', e)
self.log.info(traceback.format_exc())
await self.send_message({'kind': 'error', 'error': str(e)})

async def do_command(self, command: str) -> bool:
self.log.info('Test service sent command: %s' % command)
# currently we support no special commands
return False

async def send_message(self, message):
if self.closed:
return
self.callback_counter += 1
callback_url = "%s/%d" % (self.callback_url, self.callback_counter)
try:
async with self._http_session.post(
callback_url,
data=json.dumps(message),
headers={'Content-Type': 'application/json'},
) as resp:
if resp.status >= 300 and not self.closed:
self.log.error('Callback request returned HTTP error %d', resp.status)
except Exception as e:
if not self.closed:
self.log.error('Callback request failed: %s', e)

async def close(self):
self.closed = True
if self.sse is not None:
await self.sse.close()
self.log.info('Test ended')
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,6 @@
autodoc_default_options = {
'undoc-members': False
}

# aiohttp is an optional dependency not installed during doc builds
autodoc_mock_imports = ['aiohttp']
19 changes: 18 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ LaunchDarkly Python SSE Client

This is the API reference for the `launchdarkly-eventsource <https://github.com/launchdarkly/python-eventsource/>`_ package, a `Server-Sent Events <https://html.spec.whatwg.org/multipage/server-sent-events.html>`_ client for Python. This package is used internally by the `LaunchDarkly Python SDK <https://github.com/launchdarkly/python-server-sdk>`_, but may also be useful for other purposes.


ld_eventsource module
---------------------

Expand Down Expand Up @@ -37,3 +36,21 @@ ld_eventsource.errors module
:members:
:special-members: __init__
:show-inheritance:


ld_eventsource.async_client module
-----------------------------------

.. automodule:: ld_eventsource.async_client
:members:
:special-members: __init__
:show-inheritance:


ld_eventsource.config.async_connect_strategy module
----------------------------------------------------

.. automodule:: ld_eventsource.config.async_connect_strategy
:members:
:special-members: __init__
:show-inheritance:
9 changes: 9 additions & 0 deletions ld_eventsource/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
from ld_eventsource.sse_client import *


def __getattr__(name):
# Lazily import AsyncSSEClient so that aiohttp (an optional dependency)
# is never imported for sync-only users who don't have it installed.
if name == 'AsyncSSEClient':
from ld_eventsource.async_client import AsyncSSEClient
return AsyncSSEClient
raise AttributeError(f"module 'ld_eventsource' has no attribute {name!r}")
Loading
Loading