From 2e5af752afbeeb7660e9d45a7a614db590a7bfb8 Mon Sep 17 00:00:00 2001 From: Jared Casey Date: Fri, 1 Aug 2025 14:45:41 -0600 Subject: [PATCH] PYCO-60: Cleanup TODOs Changes ------- * Address remaining TODOs with either log message, PYCO ticket or remove --- .../protocol/_core/anyio_utils.py | 4 +-- .../protocol/_core/async_json_stream.py | 2 +- .../protocol/_core/net_utils.py | 5 --- .../protocol/_core/request_context.py | 12 ++----- acouchbase_analytics/protocol/cluster.py | 8 +++-- acouchbase_analytics/protocol/scope.py | 5 ++- .../common/_core/duration_str_utils.py | 2 +- couchbase_analytics/common/_core/query.py | 1 - couchbase_analytics/common/_core/utils.py | 1 - .../protocol/_core/net_utils.py | 5 --- couchbase_analytics/protocol/_core/request.py | 1 - .../protocol/_core/request_context.py | 31 +++---------------- couchbase_analytics/protocol/cluster.py | 2 +- couchbase_analytics/protocol/connection.py | 12 ++++--- .../tests/duration_parsing_t.py | 2 +- 15 files changed, 28 insertions(+), 65 deletions(-) diff --git a/acouchbase_analytics/protocol/_core/anyio_utils.py b/acouchbase_analytics/protocol/_core/anyio_utils.py index 3a29499..679a055 100644 --- a/acouchbase_analytics/protocol/_core/anyio_utils.py +++ b/acouchbase_analytics/protocol/_core/anyio_utils.py @@ -52,8 +52,6 @@ def current_async_library() -> Optional[AsyncBackend]: except ImportError: async_lib = 'asyncio' - # TODO: This helps make tests work. - # Should we work through the scenario when sniffio cannot find the async library? try: async_lib = sniffio.current_async_library() except sniffio.AsyncLibraryNotFoundError: @@ -62,7 +60,7 @@ def current_async_library() -> Optional[AsyncBackend]: if async_lib not in ('asyncio', 'trio'): raise RuntimeError('Running under an unsupported async environment.') - # TODO: confirm trio support + # TODO(PYCO-71): Add trio support if async_lib == 'trio': raise RuntimeError('trio currently not supported') diff --git a/acouchbase_analytics/protocol/_core/async_json_stream.py b/acouchbase_analytics/protocol/_core/async_json_stream.py index 474638e..9873503 100644 --- a/acouchbase_analytics/protocol/_core/async_json_stream.py +++ b/acouchbase_analytics/protocol/_core/async_json_stream.py @@ -136,7 +136,7 @@ async def _process_token_stream(self) -> None: try: _, event, value = await self._json_stream_parser.__anext__() # type: ignore[attr-defined] # this is a hack b/c the ijson.parse_async iterator does not yield to the event loop - # TODO: create PYCO to either build custom JSON parsing, or dig into ijson root cause + # TODO(PYCO-74): create PYCO to either build custom JSON parsing, or dig into ijson root cause await self._json_token_parser.parse_token(event, value) except StopAsyncIteration: self._token_stream_exhausted = True diff --git a/acouchbase_analytics/protocol/_core/net_utils.py b/acouchbase_analytics/protocol/_core/net_utils.py index cc75af7..59a96df 100644 --- a/acouchbase_analytics/protocol/_core/net_utils.py +++ b/acouchbase_analytics/protocol/_core/net_utils.py @@ -34,11 +34,6 @@ async def get_request_ip_async(host: str, port: int, logger_handler: Optional[Ca except ValueError: ip = None - # if we have localhost, httpx does not seem to be able to resolve IPv6 localhost (::1) properly - # TODO: IPv6 support for localhost?? - if host == 'localhost': - ip = '127.0.0.1' - if not ip: result = await anyio.getaddrinfo(host, port, type=socket.SOCK_STREAM, family=socket.AF_UNSPEC) res_ip = choice([addr[4][0] for addr in result]) # nosec B311 diff --git a/acouchbase_analytics/protocol/_core/request_context.py b/acouchbase_analytics/protocol/_core/request_context.py index e8bc22d..10a15f8 100644 --- a/acouchbase_analytics/protocol/_core/request_context.py +++ b/acouchbase_analytics/protocol/_core/request_context.py @@ -29,9 +29,6 @@ class AsyncRequestContext: - # TODO: AsyncExitStack?? - # https://anyio.readthedocs.io/en/stable/cancellation.html - def __init__( self, client_adapter: _AsyncClientAdapter, @@ -192,7 +189,6 @@ async def _trace_handler(self, event_name: str, _: str) -> None: self._cancel_scope_deadline_updated = True def _update_cancel_scope_deadline(self, deadline: float, is_absolute: Optional[bool] = False) -> None: - # TODO: confirm scenario of get_time() < self._taskgroup.cancel_scope.deadline is handled by anyio new_deadline = deadline if is_absolute else get_time() + deadline current_time = get_time() if current_time >= new_deadline: @@ -235,10 +231,6 @@ def create_response_task(self, fn: Callable[..., Coroutine[Any, Any, Any]], *arg raise RuntimeError('Async backend loop is not initialized.') task_name = f'{self._id}-response-task' task: Task[Any] = self._backend.loop.create_task(fn(*args), name=task_name) - # TODO: Confirm if callback is useful/necessary; - # def task_done(t: Task[Any]) -> None: - # print(f'Task done callback task=({t.get_name()}); done: {t.done()}, cancelled: {t.cancelled()}') - # task.add_done_callback(task_done) self._response_task = task return task @@ -416,7 +408,7 @@ async def shutdown( def start_stream(self, core_response: HttpCoreResponse) -> None: if hasattr(self, '_json_stream'): - # TODO: logging; I don't think this is an error... + self.log_message('JSON stream already exists', LogLevel.WARNING) return self._json_stream = AsyncJsonStream( @@ -437,6 +429,7 @@ async def __aenter__(self) -> AsyncRequestContext: await self._taskgroup.__aenter__() return self + # TODO(PYCO-72): Possible improvement to handling async RequestContext.__aexit__ async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType] ) -> Optional[bool]: @@ -447,5 +440,4 @@ async def __aexit__( finally: self._maybe_set_request_error(exc_type, exc_val) del self._taskgroup - # TODO: should we suppress here (e.g., return True) return None # noqa: B012 diff --git a/acouchbase_analytics/protocol/cluster.py b/acouchbase_analytics/protocol/cluster.py index 8e4ef32..5173bdb 100644 --- a/acouchbase_analytics/protocol/cluster.py +++ b/acouchbase_analytics/protocol/cluster.py @@ -28,6 +28,7 @@ from acouchbase_analytics.protocol._core.client_adapter import _AsyncClientAdapter from acouchbase_analytics.protocol._core.request_context import AsyncRequestContext from acouchbase_analytics.protocol.streaming import AsyncHttpStreamingResponse +from couchbase_analytics.common.logging import LogLevel from couchbase_analytics.common.result import AsyncQueryResult from couchbase_analytics.protocol._core.request import _RequestBuilder @@ -92,12 +93,13 @@ async def shutdown(self) -> None: if self.has_client: await self._shutdown() else: - # TODO: log warning - print('Cluster does not have a connection. Ignoring') + self.client_adapter.log_message('Cluster does not have a connection. Ignoring shutdown.', LogLevel.WARNING) async def _execute_query(self, http_resp: AsyncHttpStreamingResponse) -> AsyncQueryResult: if not self.has_client: - # TODO: add log message?? + self.client_adapter.log_message( + 'Cluster does not have a connection. Creating the client.', LogLevel.WARNING + ) await self._create_client() await http_resp.send_request() return AsyncQueryResult(http_resp) diff --git a/acouchbase_analytics/protocol/scope.py b/acouchbase_analytics/protocol/scope.py index 631f32e..7a6f6a0 100644 --- a/acouchbase_analytics/protocol/scope.py +++ b/acouchbase_analytics/protocol/scope.py @@ -27,6 +27,7 @@ from acouchbase_analytics.protocol._core.client_adapter import _AsyncClientAdapter from acouchbase_analytics.protocol._core.request_context import AsyncRequestContext from acouchbase_analytics.protocol.streaming import AsyncHttpStreamingResponse +from couchbase_analytics.common.logging import LogLevel from couchbase_analytics.common.result import AsyncQueryResult from couchbase_analytics.protocol._core.request import _RequestBuilder @@ -63,7 +64,9 @@ async def _create_client(self) -> None: async def _execute_query(self, http_resp: AsyncHttpStreamingResponse) -> AsyncQueryResult: if not self.client_adapter.has_client: - # TODO: add log message?? + self.client_adapter.log_message( + 'Cluster does not have a connection. Creating the client.', LogLevel.WARNING + ) await self._create_client() await http_resp.send_request() return AsyncQueryResult(http_resp) diff --git a/couchbase_analytics/common/_core/duration_str_utils.py b/couchbase_analytics/common/_core/duration_str_utils.py index 5b582d0..a48ba52 100644 --- a/couchbase_analytics/common/_core/duration_str_utils.py +++ b/couchbase_analytics/common/_core/duration_str_utils.py @@ -18,7 +18,7 @@ from couchbase_analytics.common._core.utils import is_null_or_empty -# TODO: Apparently Go does not allow a leading decimal point without a leading zero, e.g., ".5s" is invalid. +# NOTE: Apparently Go does not allow a leading decimal point without a leading zero, e.g., ".5s" is invalid. # We allowed this in the Columnar SDK due to how the C++ client parsed durations DURATION_PATTERN = re.compile(r'^([-+]?)((\d*(\.\d*)?){1}(?:ns|us|µs|μs|ms|s|m|h){1})+$') DURATION_PAIRS_PATTERN = re.compile(r'(\d*(?:\.\d*)?)(ns|us|ms|s|m|h)') diff --git a/couchbase_analytics/common/_core/query.py b/couchbase_analytics/common/_core/query.py index 2e798bd..e67cac9 100644 --- a/couchbase_analytics/common/_core/query.py +++ b/couchbase_analytics/common/_core/query.py @@ -87,7 +87,6 @@ def build_query_metadata(json_data: Optional[Any] = None, raw_metadata: Optional 'warnings': warnings, } - # TODO: include status in metadata?? Seems to only be populated in error scenario if 'status' in json_data: metadata['status'] = json_data.get('status', '') diff --git a/couchbase_analytics/common/_core/utils.py b/couchbase_analytics/common/_core/utils.py index 8c785c4..88cede1 100644 --- a/couchbase_analytics/common/_core/utils.py +++ b/couchbase_analytics/common/_core/utils.py @@ -112,7 +112,6 @@ def __call__(self, value: Any) -> str: if isinstance(value, str): if value in (x.value for x in expected_type): - # TODO: use warning -- maybe don't want to allow str representation? return value raise ValueError(f"Invalid str representation of {expected_type}. Received '{value}'.") diff --git a/couchbase_analytics/protocol/_core/net_utils.py b/couchbase_analytics/protocol/_core/net_utils.py index b311058..935e0d3 100644 --- a/couchbase_analytics/protocol/_core/net_utils.py +++ b/couchbase_analytics/protocol/_core/net_utils.py @@ -32,11 +32,6 @@ def get_request_ip(host: str, port: int, logger_handler: Optional[Callable[..., except ValueError: ip = None - # if we have localhost, httpx does not seem to be able to resolve IPv6 localhost (::1) properly - # TODO: IPv6 support for localhost?? - if host == 'localhost': - ip = '127.0.0.1' - if not ip: result = socket.getaddrinfo(host, port, type=socket.SOCK_STREAM, family=socket.AF_UNSPEC) res_ip = choice([addr[4][0] for addr in result]) # nosec B311 diff --git a/couchbase_analytics/protocol/_core/request.py b/couchbase_analytics/protocol/_core/request.py index 29b9e9e..57da2ee 100644 --- a/couchbase_analytics/protocol/_core/request.py +++ b/couchbase_analytics/protocol/_core/request.py @@ -109,7 +109,6 @@ def __init__( self._extensions: RequestExtensions = { 'timeout': {'pool': connect_timeout, 'connect': connect_timeout, 'read': self._default_query_timeout} } - # TODO: warning if we have a secure connection, but the sni_hostname is not set? if self._conn_details.is_secure() and self._conn_details.sni_hostname is not None: self._extensions['sni_hostname'] = self._conn_details.sni_hostname diff --git a/couchbase_analytics/protocol/_core/request_context.py b/couchbase_analytics/protocol/_core/request_context.py index 991f2a4..20ea378 100644 --- a/couchbase_analytics/protocol/_core/request_context.py +++ b/couchbase_analytics/protocol/_core/request_context.py @@ -4,8 +4,8 @@ import math import time from concurrent.futures import CancelledError, Future, ThreadPoolExecutor -from threading import Event, Lock -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, List, Optional, Union +from threading import Event +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union from uuid import uuid4 from httpx import Response as HttpCoreResponse @@ -27,27 +27,6 @@ from couchbase_analytics.protocol._core.request import QueryRequest -# TODO: might not be needed; need to validate httpx iterator behavior -class ThreadSafeBytesIterator: - def __init__(self, iterator: Iterator[bytes]): - if not hasattr(iterator, '__next__'): - raise TypeError('Provided object is not an iterator (missing __next__ method).') - self._iterator = iterator - self._lock = Lock() - - def __iter__(self) -> ThreadSafeBytesIterator: - return self - - def __next__(self) -> bytes: - with self._lock: # Acquire the lock before accessing the iterator - try: - item = next(self._iterator) - return item - except StopIteration: - # Always re-raise StopIteration to signal the end of iteration - raise - - class BackgroundRequest: def __init__( self, bg_future: Future[BlockingQueryResult], user_future: Future[BlockingQueryResult], cancel_event: Event @@ -185,7 +164,7 @@ def _check_cancelled_or_timed_out(self) -> None: self._request_state = RequestState.Timeout def _create_stage_notification_future(self) -> None: - # TODO: custom ThreadPoolExecutor, to get a "plain" future + # TODO(PYCO-75): custom ThreadPoolExecutor, to get a "plain" future if self._stage_notification_ft is not None: raise RuntimeError('Stage notification future already created for this context.') self._stage_notification_ft = Future[ParsedResultType]() @@ -407,7 +386,7 @@ def send_request_in_background( ) -> Future[BlockingQueryResult]: if self._background_request is not None: raise RuntimeError('Background reqeust already created for this context.') - # TODO: custom ThreadPoolExecutor, to get a "plain" future + # TODO(PYCO-75): custom ThreadPoolExecutor, to get a "plain" future user_ft = Future[BlockingQueryResult]() background_work_ft = self._tp_executor.submit(fn, *args) self._background_request = BackgroundRequest(background_work_ft, user_ft, self._cancel_event) @@ -441,7 +420,7 @@ def start_stream(self, core_response: HttpCoreResponse) -> None: self.log_message('JSON stream already exists', LogLevel.WARNING) return - # TODO: need to confirm if the httpx Response iterator is thread-safe + # TODO(PYCO-73): Potentially use new iterator if problems w/ httpx self._json_stream = JsonStream( core_response.iter_bytes(), stream_config=self._stream_config, logger_handler=self.log_message ) diff --git a/couchbase_analytics/protocol/cluster.py b/couchbase_analytics/protocol/cluster.py index f1e676d..7b6a52a 100644 --- a/couchbase_analytics/protocol/cluster.py +++ b/couchbase_analytics/protocol/cluster.py @@ -41,7 +41,7 @@ def __init__( self._client_adapter = _ClientAdapter(http_endpoint, credential, options, **kwargs) self._request_builder = _RequestBuilder(self._client_adapter) self._create_client() - # TODO: make a custom ThreadPoolExecutor, so that we can override submit and have a way to get + # TODO(PYCO-75): make a custom ThreadPoolExecutor, so that we can override submit and have a way to get # a "plain" future as the docs say we should create a future via an executor # The RequestContext generates a future that enables some background processing # Allow the default max_workers which is (as of Python 3.8): min(32, os.cpu_count() + 4). diff --git a/couchbase_analytics/protocol/connection.py b/couchbase_analytics/protocol/connection.py index c90948b..1f29d39 100644 --- a/couchbase_analytics/protocol/connection.py +++ b/couchbase_analytics/protocol/connection.py @@ -120,7 +120,9 @@ def parse_query_string_value(value: List[str], enforce_str: Optional[bool] = Fal return v -def parse_query_str_options(query_str_opts: Dict[str, List[str]]) -> Dict[str, QueryStrVal]: +def parse_query_str_options( + query_str_opts: Dict[str, List[str]], logger_name: Optional[str] = None +) -> Dict[str, QueryStrVal]: final_opts: Dict[str, QueryStrVal] = {} for k, v in query_str_opts.items(): tokens = k.split('.') @@ -131,8 +133,9 @@ def parse_query_str_options(query_str_opts: Dict[str, List[str]]) -> Dict[str, Q val = parse_query_string_value(v, enforce_str=True) final_opts[tokens[1]] = parse_duration_str(cast(str, val)) else: - print('Warning: Unrecognized query string option:', k) - # TODO: exceptions -- this means the user passed in an invalid option + if logger_name is not None: + logger = logging.getLogger(logger_name) + logger.warning(f'Unrecognized query string option: {k}') pass else: if k in SecurityOptions.VALID_OPTION_KEYS: @@ -189,7 +192,6 @@ def is_secure(self) -> bool: def validate_security_options(self) -> None: # noqa: C901 security_opts: Optional[SecurityOptionsTransformedKwargs] = self.cluster_options.get('security_options') - # TODO: security settings if security_opts is not None: # separate between value options and boolean option (trust_only_capella) solo_security_opts = ['trust_only_pem_file', 'trust_only_pem_str', 'trust_only_certificates'] @@ -256,7 +258,7 @@ def create( ClusterOptionsTransformedKwargs, kwargs, options, - query_str_opts=parse_query_str_options(query_str_opts), + query_str_opts=parse_query_str_options(query_str_opts, logger_name=logger_name), ) default_deserializer = cluster_opts.pop('deserializer', None) diff --git a/couchbase_analytics/tests/duration_parsing_t.py b/couchbase_analytics/tests/duration_parsing_t.py index 7fce156..f0b3325 100644 --- a/couchbase_analytics/tests/duration_parsing_t.py +++ b/couchbase_analytics/tests/duration_parsing_t.py @@ -59,7 +59,7 @@ def test_invalid_durations(self, duration: str) -> None: ('1.234h', 1.234 * 3.6e6), ('1h30m0s', 5.4e6), ('0.1h10m', 9.6e5), - # TODO: apparently this is invalid in Go, but was okay w/ C++ implementation + # NOTE: apparently this is invalid in Go, but was okay w/ C++ implementation ('.1h10m', 9.6e5), ('0001h00010m', 4.2e6), ('100ns', 1e-4),