Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions acouchbase_analytics/protocol/_core/anyio_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ def current_async_library() -> Optional[AsyncBackend]:
except ImportError:
async_lib = 'asyncio'

# TODO: This helps make tests work.
# Should we work through the scenario when sniffio cannot find the async library?
try:
async_lib = sniffio.current_async_library()
except sniffio.AsyncLibraryNotFoundError:
Expand All @@ -62,7 +60,7 @@ def current_async_library() -> Optional[AsyncBackend]:
if async_lib not in ('asyncio', 'trio'):
raise RuntimeError('Running under an unsupported async environment.')

# TODO: confirm trio support
# TODO(PYCO-71): Add trio support
if async_lib == 'trio':
raise RuntimeError('trio currently not supported')

Expand Down
2 changes: 1 addition & 1 deletion acouchbase_analytics/protocol/_core/async_json_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def _process_token_stream(self) -> None:
try:
_, event, value = await self._json_stream_parser.__anext__() # type: ignore[attr-defined]
# this is a hack b/c the ijson.parse_async iterator does not yield to the event loop
# TODO: create PYCO to either build custom JSON parsing, or dig into ijson root cause
# TODO(PYCO-74): create PYCO to either build custom JSON parsing, or dig into ijson root cause
await self._json_token_parser.parse_token(event, value)
except StopAsyncIteration:
self._token_stream_exhausted = True
Expand Down
5 changes: 0 additions & 5 deletions acouchbase_analytics/protocol/_core/net_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ async def get_request_ip_async(host: str, port: int, logger_handler: Optional[Ca
except ValueError:
ip = None

# if we have localhost, httpx does not seem to be able to resolve IPv6 localhost (::1) properly
# TODO: IPv6 support for localhost??
if host == 'localhost':
ip = '127.0.0.1'

if not ip:
result = await anyio.getaddrinfo(host, port, type=socket.SOCK_STREAM, family=socket.AF_UNSPEC)
res_ip = choice([addr[4][0] for addr in result]) # nosec B311
Expand Down
12 changes: 2 additions & 10 deletions acouchbase_analytics/protocol/_core/request_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@


class AsyncRequestContext:
# TODO: AsyncExitStack??
# https://anyio.readthedocs.io/en/stable/cancellation.html

def __init__(
self,
client_adapter: _AsyncClientAdapter,
Expand Down Expand Up @@ -192,7 +189,6 @@ async def _trace_handler(self, event_name: str, _: str) -> None:
self._cancel_scope_deadline_updated = True

def _update_cancel_scope_deadline(self, deadline: float, is_absolute: Optional[bool] = False) -> None:
# TODO: confirm scenario of get_time() < self._taskgroup.cancel_scope.deadline is handled by anyio
new_deadline = deadline if is_absolute else get_time() + deadline
current_time = get_time()
if current_time >= new_deadline:
Expand Down Expand Up @@ -235,10 +231,6 @@ def create_response_task(self, fn: Callable[..., Coroutine[Any, Any, Any]], *arg
raise RuntimeError('Async backend loop is not initialized.')
task_name = f'{self._id}-response-task'
task: Task[Any] = self._backend.loop.create_task(fn(*args), name=task_name)
# TODO: Confirm if callback is useful/necessary;
# def task_done(t: Task[Any]) -> None:
# print(f'Task done callback task=({t.get_name()}); done: {t.done()}, cancelled: {t.cancelled()}')
# task.add_done_callback(task_done)
self._response_task = task
return task

Expand Down Expand Up @@ -416,7 +408,7 @@ async def shutdown(

def start_stream(self, core_response: HttpCoreResponse) -> None:
if hasattr(self, '_json_stream'):
# TODO: logging; I don't think this is an error...
self.log_message('JSON stream already exists', LogLevel.WARNING)
return

self._json_stream = AsyncJsonStream(
Expand All @@ -437,6 +429,7 @@ async def __aenter__(self) -> AsyncRequestContext:
await self._taskgroup.__aenter__()
return self

# TODO(PYCO-72): Possible improvement to handling async RequestContext.__aexit__
async def __aexit__(
self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType]
) -> Optional[bool]:
Expand All @@ -447,5 +440,4 @@ async def __aexit__(
finally:
self._maybe_set_request_error(exc_type, exc_val)
del self._taskgroup
# TODO: should we suppress here (e.g., return True)
return None # noqa: B012
8 changes: 5 additions & 3 deletions acouchbase_analytics/protocol/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from acouchbase_analytics.protocol._core.client_adapter import _AsyncClientAdapter
from acouchbase_analytics.protocol._core.request_context import AsyncRequestContext
from acouchbase_analytics.protocol.streaming import AsyncHttpStreamingResponse
from couchbase_analytics.common.logging import LogLevel
from couchbase_analytics.common.result import AsyncQueryResult
from couchbase_analytics.protocol._core.request import _RequestBuilder

Expand Down Expand Up @@ -92,12 +93,13 @@ async def shutdown(self) -> None:
if self.has_client:
await self._shutdown()
else:
# TODO: log warning
print('Cluster does not have a connection. Ignoring')
self.client_adapter.log_message('Cluster does not have a connection. Ignoring shutdown.', LogLevel.WARNING)

async def _execute_query(self, http_resp: AsyncHttpStreamingResponse) -> AsyncQueryResult:
if not self.has_client:
# TODO: add log message??
self.client_adapter.log_message(
'Cluster does not have a connection. Creating the client.', LogLevel.WARNING
)
await self._create_client()
await http_resp.send_request()
return AsyncQueryResult(http_resp)
Expand Down
5 changes: 4 additions & 1 deletion acouchbase_analytics/protocol/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from acouchbase_analytics.protocol._core.client_adapter import _AsyncClientAdapter
from acouchbase_analytics.protocol._core.request_context import AsyncRequestContext
from acouchbase_analytics.protocol.streaming import AsyncHttpStreamingResponse
from couchbase_analytics.common.logging import LogLevel
from couchbase_analytics.common.result import AsyncQueryResult
from couchbase_analytics.protocol._core.request import _RequestBuilder

Expand Down Expand Up @@ -63,7 +64,9 @@ async def _create_client(self) -> None:

async def _execute_query(self, http_resp: AsyncHttpStreamingResponse) -> AsyncQueryResult:
if not self.client_adapter.has_client:
# TODO: add log message??
self.client_adapter.log_message(
'Cluster does not have a connection. Creating the client.', LogLevel.WARNING
)
await self._create_client()
await http_resp.send_request()
return AsyncQueryResult(http_resp)
Expand Down
2 changes: 1 addition & 1 deletion couchbase_analytics/common/_core/duration_str_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from couchbase_analytics.common._core.utils import is_null_or_empty

# TODO: Apparently Go does not allow a leading decimal point without a leading zero, e.g., ".5s" is invalid.
# NOTE: Apparently Go does not allow a leading decimal point without a leading zero, e.g., ".5s" is invalid.
# We allowed this in the Columnar SDK due to how the C++ client parsed durations
DURATION_PATTERN = re.compile(r'^([-+]?)((\d*(\.\d*)?){1}(?:ns|us|µs|μs|ms|s|m|h){1})+$')
DURATION_PAIRS_PATTERN = re.compile(r'(\d*(?:\.\d*)?)(ns|us|ms|s|m|h)')
Expand Down
1 change: 0 additions & 1 deletion couchbase_analytics/common/_core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def build_query_metadata(json_data: Optional[Any] = None, raw_metadata: Optional
'warnings': warnings,
}

# TODO: include status in metadata?? Seems to only be populated in error scenario
if 'status' in json_data:
metadata['status'] = json_data.get('status', '')

Expand Down
1 change: 0 additions & 1 deletion couchbase_analytics/common/_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def __call__(self, value: Any) -> str:

if isinstance(value, str):
if value in (x.value for x in expected_type):
# TODO: use warning -- maybe don't want to allow str representation?
return value
raise ValueError(f"Invalid str representation of {expected_type}. Received '{value}'.")

Expand Down
5 changes: 0 additions & 5 deletions couchbase_analytics/protocol/_core/net_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ def get_request_ip(host: str, port: int, logger_handler: Optional[Callable[...,
except ValueError:
ip = None

# if we have localhost, httpx does not seem to be able to resolve IPv6 localhost (::1) properly
# TODO: IPv6 support for localhost??
if host == 'localhost':
ip = '127.0.0.1'

if not ip:
result = socket.getaddrinfo(host, port, type=socket.SOCK_STREAM, family=socket.AF_UNSPEC)
res_ip = choice([addr[4][0] for addr in result]) # nosec B311
Expand Down
1 change: 0 additions & 1 deletion couchbase_analytics/protocol/_core/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def __init__(
self._extensions: RequestExtensions = {
'timeout': {'pool': connect_timeout, 'connect': connect_timeout, 'read': self._default_query_timeout}
}
# TODO: warning if we have a secure connection, but the sni_hostname is not set?
if self._conn_details.is_secure() and self._conn_details.sni_hostname is not None:
self._extensions['sni_hostname'] = self._conn_details.sni_hostname

Expand Down
31 changes: 5 additions & 26 deletions couchbase_analytics/protocol/_core/request_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import math
import time
from concurrent.futures import CancelledError, Future, ThreadPoolExecutor
from threading import Event, Lock
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional, Union
from threading import Event
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from uuid import uuid4

from httpx import Response as HttpCoreResponse
Expand All @@ -27,27 +27,6 @@
from couchbase_analytics.protocol._core.request import QueryRequest


# TODO: might not be needed; need to validate httpx iterator behavior
class ThreadSafeBytesIterator:
def __init__(self, iterator: Iterator[bytes]):
if not hasattr(iterator, '__next__'):
raise TypeError('Provided object is not an iterator (missing __next__ method).')
self._iterator = iterator
self._lock = Lock()

def __iter__(self) -> ThreadSafeBytesIterator:
return self

def __next__(self) -> bytes:
with self._lock: # Acquire the lock before accessing the iterator
try:
item = next(self._iterator)
return item
except StopIteration:
# Always re-raise StopIteration to signal the end of iteration
raise


class BackgroundRequest:
def __init__(
self, bg_future: Future[BlockingQueryResult], user_future: Future[BlockingQueryResult], cancel_event: Event
Expand Down Expand Up @@ -185,7 +164,7 @@ def _check_cancelled_or_timed_out(self) -> None:
self._request_state = RequestState.Timeout

def _create_stage_notification_future(self) -> None:
# TODO: custom ThreadPoolExecutor, to get a "plain" future
# TODO(PYCO-75): custom ThreadPoolExecutor, to get a "plain" future
if self._stage_notification_ft is not None:
raise RuntimeError('Stage notification future already created for this context.')
self._stage_notification_ft = Future[ParsedResultType]()
Expand Down Expand Up @@ -407,7 +386,7 @@ def send_request_in_background(
) -> Future[BlockingQueryResult]:
if self._background_request is not None:
raise RuntimeError('Background reqeust already created for this context.')
# TODO: custom ThreadPoolExecutor, to get a "plain" future
# TODO(PYCO-75): custom ThreadPoolExecutor, to get a "plain" future
user_ft = Future[BlockingQueryResult]()
background_work_ft = self._tp_executor.submit(fn, *args)
self._background_request = BackgroundRequest(background_work_ft, user_ft, self._cancel_event)
Expand Down Expand Up @@ -441,7 +420,7 @@ def start_stream(self, core_response: HttpCoreResponse) -> None:
self.log_message('JSON stream already exists', LogLevel.WARNING)
return

# TODO: need to confirm if the httpx Response iterator is thread-safe
# TODO(PYCO-73): Potentially use new iterator if problems w/ httpx
self._json_stream = JsonStream(
core_response.iter_bytes(), stream_config=self._stream_config, logger_handler=self.log_message
)
Expand Down
2 changes: 1 addition & 1 deletion couchbase_analytics/protocol/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
self._client_adapter = _ClientAdapter(http_endpoint, credential, options, **kwargs)
self._request_builder = _RequestBuilder(self._client_adapter)
self._create_client()
# TODO: make a custom ThreadPoolExecutor, so that we can override submit and have a way to get
# TODO(PYCO-75): make a custom ThreadPoolExecutor, so that we can override submit and have a way to get
# a "plain" future as the docs say we should create a future via an executor
# The RequestContext generates a future that enables some background processing
# Allow the default max_workers which is (as of Python 3.8): min(32, os.cpu_count() + 4).
Expand Down
12 changes: 7 additions & 5 deletions couchbase_analytics/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ def parse_query_string_value(value: List[str], enforce_str: Optional[bool] = Fal
return v


def parse_query_str_options(query_str_opts: Dict[str, List[str]]) -> Dict[str, QueryStrVal]:
def parse_query_str_options(
query_str_opts: Dict[str, List[str]], logger_name: Optional[str] = None
) -> Dict[str, QueryStrVal]:
final_opts: Dict[str, QueryStrVal] = {}
for k, v in query_str_opts.items():
tokens = k.split('.')
Expand All @@ -131,8 +133,9 @@ def parse_query_str_options(query_str_opts: Dict[str, List[str]]) -> Dict[str, Q
val = parse_query_string_value(v, enforce_str=True)
final_opts[tokens[1]] = parse_duration_str(cast(str, val))
else:
print('Warning: Unrecognized query string option:', k)
# TODO: exceptions -- this means the user passed in an invalid option
if logger_name is not None:
logger = logging.getLogger(logger_name)
logger.warning(f'Unrecognized query string option: {k}')
pass
else:
if k in SecurityOptions.VALID_OPTION_KEYS:
Expand Down Expand Up @@ -189,7 +192,6 @@ def is_secure(self) -> bool:

def validate_security_options(self) -> None: # noqa: C901
security_opts: Optional[SecurityOptionsTransformedKwargs] = self.cluster_options.get('security_options')
# TODO: security settings
if security_opts is not None:
# separate between value options and boolean option (trust_only_capella)
solo_security_opts = ['trust_only_pem_file', 'trust_only_pem_str', 'trust_only_certificates']
Expand Down Expand Up @@ -256,7 +258,7 @@ def create(
ClusterOptionsTransformedKwargs,
kwargs,
options,
query_str_opts=parse_query_str_options(query_str_opts),
query_str_opts=parse_query_str_options(query_str_opts, logger_name=logger_name),
)

default_deserializer = cluster_opts.pop('deserializer', None)
Expand Down
2 changes: 1 addition & 1 deletion couchbase_analytics/tests/duration_parsing_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_invalid_durations(self, duration: str) -> None:
('1.234h', 1.234 * 3.6e6),
('1h30m0s', 5.4e6),
('0.1h10m', 9.6e5),
# TODO: apparently this is invalid in Go, but was okay w/ C++ implementation
# NOTE: apparently this is invalid in Go, but was okay w/ C++ implementation
('.1h10m', 9.6e5),
('0001h00010m', 4.2e6),
('100ns', 1e-4),
Expand Down