Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions obstore/python/obstore/_store/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ from ._azure import AzureCredentialProvider as AzureCredentialProvider
from ._azure import AzureSASToken as AzureSASToken
from ._azure import AzureStore as AzureStore
from ._client import ClientConfig as ClientConfig
from ._client import ClientFactory as ClientFactory
from ._client import HttpRequest as HttpRequest
from ._client import HttpResponse as HttpResponse
from ._client import HttpService as HttpService
from ._gcs import GCSConfig as GCSConfig
from ._gcs import GCSCredential as GCSCredential
from ._gcs import GCSCredentialProvider as GCSCredentialProvider
Expand All @@ -42,6 +46,7 @@ def from_url(
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: S3CredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[S3Config],
) -> ObjectStore: ...
@overload
Expand All @@ -52,6 +57,7 @@ def from_url(
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: GCSCredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[GCSConfig],
) -> ObjectStore: ...
@overload
Expand All @@ -62,6 +68,7 @@ def from_url(
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: AzureCredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[AzureConfig],
) -> ObjectStore: ...
@overload
Expand All @@ -73,6 +80,7 @@ def from_url(
retry_config: None = None,
automatic_cleanup: bool = False,
mkdir: bool = False,
client_factory: ClientFactory | None = None,
) -> ObjectStore: ...
def from_url( # type: ignore[misc] # docstring in pyi file
url: str,
Expand All @@ -81,6 +89,7 @@ def from_url( # type: ignore[misc] # docstring in pyi file
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: Callable | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Any,
) -> ObjectStore:
"""Easy construction of store by URL, identifying the relevant store.
Expand Down Expand Up @@ -120,6 +129,7 @@ def from_url( # type: ignore[misc] # docstring in pyi file
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom credentials to the underlying store classes.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: per-store configuration passed down to store-specific builders.

"""
Expand Down
6 changes: 5 additions & 1 deletion obstore/python/obstore/_store/_aws.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from collections.abc import Coroutine
from datetime import datetime
from typing import Any, Literal, Protocol, TypedDict

from ._client import ClientConfig
from ._client import ClientConfig, ClientFactory
from ._retry import RetryConfig

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -476,6 +476,7 @@ class S3Store:
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: S3CredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[S3Config], # type: ignore # noqa: PGH003 (bucket key overlaps with positional arg)
) -> None:
"""Create a new S3Store.
Expand All @@ -489,6 +490,7 @@ class S3Store:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom S3 credentials.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: AWS configuration values. Supports the same values as `config`, but as named keyword args.

Returns:
Expand All @@ -504,6 +506,7 @@ class S3Store:
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: S3CredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[S3Config],
) -> Self:
"""Parse available connection info from a well-known storage URL.
Expand All @@ -529,6 +532,7 @@ class S3Store:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom S3 credentials.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: AWS configuration values. Supports the same values as `config`, but as named keyword args.


Expand Down
6 changes: 5 additions & 1 deletion obstore/python/obstore/_store/_azure.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from collections.abc import Coroutine
from datetime import datetime
from typing import Any, Protocol, TypedDict

from ._client import ClientConfig
from ._client import ClientConfig, ClientFactory
from ._retry import RetryConfig

if sys.version_info >= (3, 10):
Expand Down Expand Up @@ -332,6 +332,7 @@ class AzureStore:
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: AzureCredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[AzureConfig], # type: ignore # noqa: PGH003 (container_name key overlaps with positional arg)
) -> None:
"""Construct a new AzureStore.
Expand All @@ -345,6 +346,7 @@ class AzureStore:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom Azure credentials.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: Azure configuration values. Supports the same values as `config`, but as named keyword args.

Returns:
Expand All @@ -362,6 +364,7 @@ class AzureStore:
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: AzureCredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[AzureConfig],
) -> Self:
"""Construct a new AzureStore with values populated from a well-known storage URL.
Expand Down Expand Up @@ -396,6 +399,7 @@ class AzureStore:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom Azure credentials.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: Azure configuration values. Supports the same values as `config`, but as named keyword args.

Returns:
Expand Down
27 changes: 26 additions & 1 deletion obstore/python/obstore/_store/_client.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from collections.abc import AsyncIterable, Buffer, Iterable
from datetime import timedelta
from typing import TypedDict
from http import HTTPMethod, HTTPStatus
from typing import Literal, Protocol, TypedDict
from urllib.parse import ParseResult

class ClientConfig(TypedDict, total=False):
"""HTTP client configuration.
Expand Down Expand Up @@ -85,3 +88,25 @@ class ClientConfig(TypedDict, total=False):
"""
user_agent: str
"""User-Agent header to be used by this client."""

class HttpRequest(TypedDict):
method: HTTPMethod
uri: ParseResult
version: Literal["0.9", "1.0", "1.1", "2.0", "3.0"]
headers: Iterable[tuple[str, bytes]]
body: Buffer | None

class HttpResponse(Protocol):
status: int | HTTPStatus
version: Literal["0.9", "1.0", "1.1", "2.0", "3.0"]
headers: Iterable[tuple[str, str | bytes]]
# TODO: not sure yet what body will look like
body: AsyncIterable[Buffer]

# This maps to what is called PyHttpConnector in Rust
class ClientFactory(Protocol):
def connect(self, options: ClientConfig) -> HttpService: ...

class HttpService(Protocol):
async def __call__(self, req: HttpRequest) -> HttpResponse:
"""Perform the given `HttpRequest`, returning an `HttpResponse`."""
6 changes: 5 additions & 1 deletion obstore/python/obstore/_store/_gcs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from collections.abc import Coroutine
from datetime import datetime
from typing import Any, Protocol, TypedDict

from ._client import ClientConfig
from ._client import ClientConfig, ClientFactory
from ._retry import RetryConfig

if sys.version_info >= (3, 11):
Expand Down Expand Up @@ -146,6 +146,7 @@ class GCSStore:
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: GCSCredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[GCSConfig], # type: ignore # noqa: PGH003 (bucket key overlaps with positional arg)
) -> None:
"""Construct a new GCSStore.
Expand All @@ -159,6 +160,7 @@ class GCSStore:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom Google credentials.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: GCS configuration values. Supports the same values as `config`, but as named keyword args.

Returns:
Expand All @@ -176,6 +178,7 @@ class GCSStore:
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
credential_provider: GCSCredentialProvider | None = None,
client_factory: ClientFactory | None = None,
**kwargs: Unpack[GCSConfig],
) -> Self:
"""Construct a new GCSStore with values populated from a well-known storage URL.
Expand All @@ -198,6 +201,7 @@ class GCSStore:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
credential_provider: A callback to provide custom Google credentials.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.
kwargs: GCS configuration values. Supports the same values as `config`, but as named keyword args.

Returns:
Expand Down
5 changes: 4 additions & 1 deletion obstore/python/obstore/_store/_http.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys

from ._client import ClientConfig
from ._client import ClientConfig, ClientFactory
from ._retry import RetryConfig

if sys.version_info >= (3, 11):
Expand All @@ -17,6 +17,7 @@ class HTTPStore:
*,
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
client_factory: ClientFactory | None = None,
) -> None:
"""Construct a new HTTPStore from a URL.

Expand All @@ -31,6 +32,7 @@ class HTTPStore:
Keyword Args:
client_options: HTTP Client options. Defaults to None.
retry_config: Retry configuration. Defaults to None.
client_factory: A custom HTTP client factory to use for requests. Defaults to None, which uses the Rust `reqwest` library to handle HTTP requests.

Returns:
HTTPStore
Expand All @@ -44,6 +46,7 @@ class HTTPStore:
*,
client_options: ClientConfig | None = None,
retry_config: RetryConfig | None = None,
client_factory: ClientFactory | None = None,
) -> Self:
"""Construct a new HTTPStore from a URL.

Expand Down
Empty file.
102 changes: 102 additions & 0 deletions obstore/python/obstore/client/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""Aiohttp client implementation."""

# ruff: noqa: PLR2004

from __future__ import annotations

import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal
from weakref import WeakSet

from aiohttp import ClientSession
from multidict import MultiDict

if TYPE_CHECKING:
from collections.abc import AsyncIterable, Iterable

from aiohttp import ClientResponse

from obstore.store import ClientConfig, ClientFactory, HttpRequest, HttpResponse


class AiohttpClientFactory:
"""A client factory for Aiohttp."""

_sessions: WeakSet[ClientSession]

def __init__(self) -> None:
"""Create a new AiohttpClientFactory."""
self._sessions = WeakSet()

def connect(self, options: ClientConfig) -> _AiohttpService: # noqa: ARG002
"""Create a new HTTP Client."""
session = ClientSession()
self._sessions.add(session)
return _AiohttpService(session)

async def close_all(self) -> None:
"""Close all generated aiohttp ClientSession instances."""
futs = [session.close() for session in self._sessions]
await asyncio.gather(*futs)


class _AiohttpService:
session: ClientSession

def __init__(self, session: ClientSession) -> None:
self.session = session

async def __call__(self, req: HttpRequest) -> HttpResponse:
method = req["method"]
url = req["uri"].geturl()

headers: MultiDict[str] = MultiDict()
for header_name, header_value in req["headers"]:
# TODO: it seems like aiohttp only allows string header values?
headers.add(header_name, str(header_value))

async with self.session.request(method, url, headers=headers) as resp:
version = _get_http_version_from_response(resp)
return _AiohttpResponse(
status=resp.status,
version=version,
headers=resp.headers.items(),
body=resp.content,
)


def _get_http_version_from_response(
resp: ClientResponse,
) -> Literal["0.9", "1.0", "1.1", "2.0", "3.0"]:
v = resp.version
if v is not None:
if v.major == 0 and v.minor == 9:
return "0.9"
if v.major == 1 and v.minor == 0:
return "1.0"
if v.major == 1 and v.minor == 1:
return "1.1"
if v.major == 2 and v.minor == 0:
return "2.0"
if v.major == 3 and v.minor == 0:
return "3.0"

return "1.1"


@dataclass
class _AiohttpResponse:
status: int
version: Literal["0.9", "1.0", "1.1", "2.0", "3.0"]
headers: Iterable[tuple[str, str | bytes]]
body: AsyncIterable


if TYPE_CHECKING:
# Just for testing
def _accepts_factory(factory: ClientFactory) -> None:
pass

aiohttp_factory = AiohttpClientFactory()
_accepts_factory(aiohttp_factory)
8 changes: 8 additions & 0 deletions obstore/python/obstore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@
AzureSASToken, # noqa: TC004
BackoffConfig, # noqa: TC004
ClientConfig, # noqa: TC004
ClientFactory, # noqa: TC004
GCSConfig, # noqa: TC004
GCSCredential, # noqa: TC004
GCSCredentialProvider, # noqa: TC004
HttpRequest, # noqa: TC004
HttpResponse, # noqa: TC004
HttpService, # noqa: TC004
RetryConfig, # noqa: TC004
S3Config, # noqa: TC004
S3Credential, # noqa: TC004
Expand Down Expand Up @@ -82,11 +86,15 @@
"AzureStore",
"BackoffConfig",
"ClientConfig",
"ClientFactory",
"GCSConfig",
"GCSCredential",
"GCSCredentialProvider",
"GCSStore",
"HTTPStore",
"HttpRequest",
"HttpResponse",
"HttpService",
"LocalStore",
"MemoryStore",
"RetryConfig",
Expand Down
Loading
Loading