diff --git a/README.rst b/README.rst index ed5136699c..55e6c15244 100644 --- a/README.rst +++ b/README.rst @@ -7,9 +7,6 @@ Hazelcast Python Client .. image:: https://img.shields.io/readthedocs/hazelcast :target: https://hazelcast.readthedocs.io :alt: Read the Docs -.. image:: https://img.shields.io/badge/slack-chat-green.svg - :target: https://slack.hazelcast.com - :alt: Join the community on Slack .. image:: https://img.shields.io/pypi/l/hazelcast-python-client :target: https://github.com/hazelcast/hazelcast-python-client/blob/master/LICENSE.txt :alt: License @@ -161,7 +158,6 @@ development/usage issues: - `GitHub repository `__ - `Documentation `__ -- `Slack `__ Contributing ------------ diff --git a/docs/getting_help.rst b/docs/getting_help.rst index 95de9eecc9..8c651bbf21 100644 --- a/docs/getting_help.rst +++ b/docs/getting_help.rst @@ -5,4 +5,3 @@ You can use the following channels for your questions and development/usage issues: - `Github Repository `__ -- `Slack `__ diff --git a/docs/index.rst b/docs/index.rst index 3f6f779276..f226881b09 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -7,9 +7,6 @@ Hazelcast Python Client .. image:: https://img.shields.io/static/v1?label=Github&message=Hazelcast%20Python%20client&style=flat&logo=github :target: https://github.com/hazelcast/hazelcast-python-client :alt: Github Repository -.. image:: https://img.shields.io/badge/slack-chat-green.svg - :target: https://slack.hazelcast.com - :alt: Join the community on Slack .. image:: https://img.shields.io/pypi/l/hazelcast-python-client :target: https://github.com/hazelcast/hazelcast-python-client/blob/master/LICENSE.txt :alt: License diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 0cfbf6fe74..ccc895aa73 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -12,6 +12,7 @@ from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection +from hazelcast.internal.asyncio_sql import _InternalSqlService, SqlService from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService from hazelcast.near_cache import NearCacheManager @@ -188,6 +189,13 @@ def __init__(self, config: Config | None = None, **kwargs): self._near_cache_manager, self._send_state_to_cluster, ) + self._internal_sql_service = _InternalSqlService( + self._connection_manager, + self._serialization_service, + self._invocation_service, + self._compact_schema_service.send_schema_and_retry, + ) + self._sql_service = SqlService(self._internal_sql_service) self._load_balancer = self._init_load_balancer(config) self._listener_service = ListenerService( self, @@ -480,6 +488,11 @@ def cluster_service(self) -> ClusterService: """ return self._cluster_service + @property + def sql(self) -> SqlService: + """Returns a service to execute distributed SQL queries.""" + return self._sql_service + def _create_address_provider(self): config = self._config cluster_members = config.cluster_members diff --git a/hazelcast/internal/asyncio_sql.py b/hazelcast/internal/asyncio_sql.py new file mode 100644 index 0000000000..2397abc92f --- /dev/null +++ b/hazelcast/internal/asyncio_sql.py @@ -0,0 +1,662 @@ +import asyncio +import logging +import typing +import uuid + +from hazelcast.protocol.codec import sql_execute_codec, sql_fetch_codec, sql_close_codec +from hazelcast.internal.asyncio_invocation import Invocation, InvocationService +from hazelcast.serialization.compact import SchemaNotReplicatedError, SchemaNotFoundError +from hazelcast.sql import ( + SqlExpectedResultType, + _TIMEOUT_NOT_SET, + _DEFAULT_CURSOR_BUFFER_SIZE, + _IteratorBase, + SqlRow, + _SqlQueryId, + HazelcastSqlError, + _SqlErrorCode, + SqlRowMetadata, + _ExecuteResponse, + _SqlStatement, +) +from hazelcast.util import ( + to_millis, + try_to_get_error_message, +) + +_logger = logging.getLogger(__name__) + + +class SqlService: + """A service to execute SQL statements. + + Warnings: + + In order to use this service, Jet engine must be enabled on the members + and the ``hazelcast-sql`` module must be in the classpath of the + members. + + If you are using the CLI, Docker image, or distributions to start + Hazelcast members, then you don't need to do anything, as the above + preconditions are already satisfied for such members. + + However, if you are using Hazelcast members in the embedded mode, or + receiving errors saying that ``The Jet engine is disabled`` or + ``Cannot execute SQL query because "hazelcast-sql" module is not in the + classpath.`` while executing queries, enable the Jet engine following + one of the instructions pointed out in the error message, or add the + ``hazelcast-sql`` module to your member's classpath. + + **Overview** + + Hazelcast is currently able to execute distributed SQL queries using the + following connectors: + + - IMap (to query data stored in a :class:`Map `) + - Kafka + - Files + + SQL statements are not atomic. *INSERT*/*SINK* can fail and commit part of + the data. + + **Usage** + + Before you can access any object using SQL, a *mapping* has to be created. + See the documentation for the ``CREATE MAPPING`` command. + + When a query is executed, an :class:`SqlResult` is returned. You may get + row iterator from the result. The result must be closed at the end. The + iterator will close the result automatically when it is exhausted given + that no error is raised during the iteration. The code snippet below + demonstrates a typical usage pattern: :: + + client = await HazelcastClient.create_and_start() + result = await client.sql.execute("SELECT * FROM person") + async for row in result: + print(row.get_object("person_id")) + print(row.get_object("name")) + ... + + See the documentation of the :class:`SqlResult` for more information about + different iteration methods. + """ + + def __init__(self, internal_sql_service): + self._service = internal_sql_service + + async def execute( + self, + sql: str, + *params: typing.Any, + cursor_buffer_size: int = _DEFAULT_CURSOR_BUFFER_SIZE, + timeout: float = _TIMEOUT_NOT_SET, + expected_result_type: int = SqlExpectedResultType.ANY, + schema: str = None + ) -> "SqlResult": + """Executes an SQL statement. + + Args: + sql: SQL string. + *params: Query parameters that will replace the placeholders at + the server-side. You may define parameter placeholders in the + query with the ``?`` character. For every placeholder, a + parameter value must be provided. + cursor_buffer_size: The cursor buffer size measured in the + number of rows. + + When a statement is submitted for execution, a + :class:`SqlResult` is returned as a result. When rows are ready + to be consumed, they are put into an internal buffer of the + cursor. This parameter defines the maximum number of rows in + that buffer. When the threshold is reached, the backpressure + mechanism will slow down the execution, possibly to a complete + halt, to prevent out-of-memory. + + Only positive values are allowed. + + The default value is expected to work well for most workloads. + A bigger buffer size may give you a slight performance boost + for queries with large result sets at the cost of increased + memory consumption. + + Defaults to ``4096``. + timeout: The execution timeout in seconds. + + If the timeout is reached for a running statement, it will be + cancelled forcefully. + + Zero value means no timeout. ``-1`` means that the value from + the server-side config will be used. Other negative values are + prohibited. + + Defaults to ``-1``. + expected_result_type: The expected result type. + schema: The schema name. + + The engine will try to resolve the non-qualified object + identifiers from the statement in the given schema. If not + found, the default search path will be used. + + The schema name is case sensitive. For example, ``foo`` and + ``Foo`` are different schemas. + + The default value is ``None`` meaning only the default search + path is used. + + Returns: + The execution result. + + Raises: + HazelcastSqlError: In case of execution error. + AssertionError: If the ``sql`` parameter is not a string, the + ``schema`` is not a string or ``None``, the ``timeout`` is not + an integer or float, or the ``cursor_buffer_size`` is not an + integer. + ValueError: If the ``sql`` parameter is an empty string, the + ``timeout`` is negative and not equal to ``-1``, the + ``cursor_buffer_size`` is not positive. + TypeError: If the ``expected_result_type`` does not equal to one of + the values or names of the members of the + :class:`SqlExpectedResultType`. + """ + return await self._service.execute( + sql, params, cursor_buffer_size, timeout, expected_result_type, schema + ) + + +class _AsyncIterator(_IteratorBase): + """An iterator that produces infinite stream of Futures. It is the + responsibility of the user to either call them in blocking fashion, + or call ``next`` only if the current call to next did not raise + ``StopAsyncIteration`` error (possibly with callback-based code). + """ + + def __aiter__(self): + return self + + async def __anext__(self): + has_next = await self._has_next() + if not has_next: + # Iterator is exhausted, raise this to inform the user. + # If the user continues to call next, we will continuously + # raise this. + raise StopAsyncIteration + + row = self._get_current_row() + self.position += 1 + return SqlRow(self.row_metadata, row) + + async def _has_next(self) -> bool: + if self.position == self.row_count: + # We exhausted the current page. + if self.is_last: + # This was the last page, no row left + # on the server side. + return False + + # It seems that there are some rows left on the server. + # Fetch them, and then return. + page_fut = await self.fetch_fn() + page = await page_fut + self.on_next_page(page) + return await self._has_next() + + # There are some elements left in the current page. + return True + + +class SqlResult(typing.AsyncIterable[SqlRow]): + """SQL query result. + + Depending on the statement type it represents a stream of + rows or an update count. + + result = await client.sql.execute("SELECT ...") + async for row in result: + # Process the row. + print(row) + + Note that, iterators can be requested at most once per SqlResult. + + One can call :func:`close` method of a result object to + release the resources associated with the result on the server side. + It might also be used to cancel query execution on the server side + if it is still active. + + When the blocking API is used, one might also use ``async with`` + statement to automatically close the query even if an exception + is thrown in the iteration. :: + + async with await client.sql.execute("SELECT ...").result() as result: + async for row in result: + # Process the row. + print(row) + + + To get the number of rows updated by the query, use the + :func:`update_count`. :: + + result = await client.sql.execute("UPDATE ...") + update_count = result.update_count() + + One does not have to call :func:`close` in this case, because the result + will already be closed in the server-side. + """ + + def __init__(self, sql_service, connection, query_id, cursor_buffer_size, execute_response): + self._sql_service = sql_service + self._connection = connection + self._query_id = query_id + self._cursor_buffer_size = cursor_buffer_size + self._lock = asyncio.Lock() + self._execute_response = execute_response + self._iterator_requested = False + self._closed = self._is_closed(execute_response) + self._fetch_task: asyncio.Task | None = None + self._fetch_future: asyncio.Future | None = None + + def iterator(self) -> typing.AsyncIterator[SqlRow]: + """Returns the iterator over the result rows. + + The iterator may be requested only once. + + Raises: + ValueError: If the result only contains an update count, or the + iterator is already requested. + + Returns: + Iterator that produces Future of :class:`SqlRow` s. See the class + documentation for the correct way to use this. + """ + return self._get_iterator() + + def is_row_set(self) -> bool: + """Returns whether this result has rows to iterate.""" + # By design, if the row_metadata (or row_page) is None, + # we only got the update count. + return self._execute_response.row_metadata is not None + + def update_count(self) -> int: + """Returns the number of rows updated by the statement or ``-1`` if + this result is a row set. In case the result doesn't contain rows but + the update count isn't applicable or known, ``0`` is returned. + """ + # This will be set to -1, when we got row set on the client side. + # See _on_execute_response. + return self._execute_response.update_count + + def get_row_metadata(self) -> SqlRowMetadata: + """Gets the row metadata. + + Raises: + ValueError: If the result only contains an update count. + """ + + response = self._execute_response + if not response.row_metadata: + raise ValueError("This result contains only update count") + + return response.row_metadata + + async def close(self) -> None: + """Release the resources associated with the query result. + + The query engine delivers the rows asynchronously. The query may + become inactive even before all rows are consumed. The invocation + of this command will cancel the execution of the query on all members + if the query is still active. Otherwise it is no-op. For a result + with an update count it is always no-op. + + The returned Future results with: + + - :class:`HazelcastSqlError`: In case there is an error closing the + result. + """ + + async with self._lock: + if self._closed: + # Do nothing if the result is already closed. + return None + + error = HazelcastSqlError( + self._sql_service.get_client_id(), + _SqlErrorCode.CANCELLED_BY_USER, + "Query was cancelled by the user", + None, + ) + if not self._fetch_future: + # Make sure that all subsequent fetches will fail. + # XXX: + self._fetch_future = asyncio.Future() + + self._on_fetch_error_unsafe(error) + self._closed = True + # Send the close request + try: + await self._sql_service.close(self._connection, self._query_id) + except Exception as e: + # If the close request is failed somehow, + # wrap it in a HazelcastSqlError. + raise self._sql_service.re_raise(e, self._connection) + + def __aiter__(self): + return self._get_iterator() + + def _get_iterator(self): + response = self._execute_response + if not response.row_metadata: + # Can't get an iterator when we only have update count + raise ValueError("This result contains only update count") + + if self._iterator_requested: + # Can't get an iterator when we already get one + raise ValueError("Iterator can be requested only once") + + self._iterator_requested = True + iterator = _AsyncIterator( + response.row_metadata, + self._fetch_next_page, + ) + # Pass the first page information to the iterator + iterator.on_next_page(response.row_page) + return iterator + + async def _fetch_next_page(self): + # Fetches the next page, if there is no fetch request in-flight. + async with self._lock: + if self._fetch_future: + # A fetch request is already in-flight, return it. + return self._fetch_future + + future = asyncio.Future() + self._fetch_future = future + self._fetch_task = asyncio.create_task(self._handle_fetch_response()) + return future + + async def _handle_fetch_response(self): + # Handles the result of the fetch request, by either: + # - setting it to exception, so that the future calls to + # fetch fails immediately. + # - setting it to next page, and setting self._fetch_future + # to None so that the next fetch request might actually + # try to fetch something from the server. + try: + response = await self._sql_service.fetch( + self._connection, self._query_id, self._cursor_buffer_size + ) + response_error = response["error"] + if response_error: + # There is a server side error sent to client. + sql_error = HazelcastSqlError( + response_error.originating_member_uuid, + response_error.code, + response_error.message, + None, + response_error.suggestion, + ) + await self._on_fetch_error(sql_error) + return + + # The result contains the next page, as expected. + await self._on_fetch_response(response["row_page"]) + except Exception as e: + # Something went bad, we couldn't get response from + # the server, invocation failed. + await self._on_fetch_error(self._sql_service.re_raise(e, self._connection)) + + async def _on_fetch_error(self, error): + # Sets the fetch future with exception, but not resetting it so that the next fetch request fails immediately. + async with self._lock: + self._on_fetch_error_unsafe(error) + + def _on_fetch_error_unsafe(self, error): + # Sets the fetch future with exception, but not resetting it so that the next fetch request fails immediately. + self._fetch_future.set_exception(error) + + async def _on_fetch_response(self, page): + # Sets the fetch future with the next page, resets it, and if this is the last page, marks the result as closed. + async with self._lock: + future = self._fetch_future + self._fetch_future = None + if page.is_last: + # This is the last page, there is nothing + # more on the server. + self._closed = True + + # Resolving the future before resetting self._fetch_future + # might result in an infinite loop for non-blocking iterators + future.set_result(page) + + @staticmethod + def _is_closed(execute_response): + # Returns whether the result is already closed or not. + # + # Result might be closed if the first response + # + # - contains the last page of the rowset (single page rowset) + # - contains just the update count + return ( + execute_response.row_metadata is None # Just an update count + or execute_response.row_page.is_last # Single page result + ) + + async def __aenter__(self): + # The response for the execute request is already + # received. There is nothing more to do. + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + # Ignoring the possible exception details + # since we close the query regardless of that. + await self.close() + + +class _InternalSqlService: + """Internal SQL service that offers more public API + than the one exposed to the user. + """ + + def __init__( + self, + connection_manager, + serialization_service, + invocation_service: InvocationService, + send_schema_and_retry_fn, + ): + self._connection_manager = connection_manager + self._serialization_service = serialization_service + self._invocation_service = invocation_service + self._send_schema_and_retry_fn = send_schema_and_retry_fn + + async def execute( + self, sql, params, cursor_buffer_size, timeout, expected_result_type, schema + ) -> "SqlResult": + """Constructs a statement and executes it. + + Args: + sql (str): SQL string. + params (tuple): Query parameters. + cursor_buffer_size (int): Cursor buffer size. + timeout (float): Timeout of the query. + expected_result_type (SqlExpectedResultType): Expected result type + of the query. + schema (str or None): The schema name. + + Returns: + SqlResult: The execution result. + """ + statement = _SqlStatement( + sql, params, cursor_buffer_size, timeout, expected_result_type, schema + ) + connection = None + try: + try: + # Serialize the passed parameters. + serialized_params = [ + self._serialization_service.to_data(param) for param in statement.parameters + ] + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry_fn( + e, + self.execute, + sql, + params, + cursor_buffer_size, + timeout, + expected_result_type, + schema, + ) + + connection = self._get_query_connection() + # Create a new, unique query id. + query_id = _SqlQueryId.from_uuid(connection.remote_uuid) + request = sql_execute_codec.encode_request( + statement.sql, + serialized_params, + # to_millis expects None to produce -1 + to_millis(None if statement.timeout == -1 else statement.timeout), + statement.cursor_buffer_size, + statement.schema, + statement.expected_result_type, + query_id, + False, + ) + invocation = Invocation( + request, + connection=connection, + response_handler=lambda m: sql_execute_codec.decode_response(m, self._to_object), + ) + res = await self._invocation_service.ainvoke(invocation) + return SqlResult( + self, + connection, + query_id, + statement.cursor_buffer_size, + self._handle_execute_response(res), + ) + except Exception as e: + raise self.re_raise(e, connection) + + async def fetch(self, connection, query_id, cursor_buffer_size): + """Fetches the next page of the query execution. + + Args: + connection (hazelcast.connection.Connection): Connection + that the first execute request, hence the fetch request + must route to. + query_id (_SqlQueryId): Unique id of the query. + cursor_buffer_size (int): Size of cursor buffer. Same as + the one used in the first execute request. + """ + request = sql_fetch_codec.encode_request(query_id, cursor_buffer_size) + invocation = Invocation( + request, + connection=connection, + response_handler=lambda m: sql_fetch_codec.decode_response(m, self._to_object), + ) + return await self._invocation_service.ainvoke(invocation) + + def get_client_id(self): + """ + Returns: + uuid.UUID: Unique client UUID. + """ + return self._connection_manager.client_uuid + + def re_raise(self, error, connection): + """Returns the error wrapped as the :class:`HazelcastSqlError` + so that it can be raised to the user. + + Args: + error (Exception): The error to reraise. + connection (hazelcast.connection.Connection|None): Connection + that the query requests are routed to. If it is not + live, we will inform the user about the possible + cluster topology change. + + Returns: + HazelcastSqlError: The reraised error. + """ + if connection and not connection.live: + return HazelcastSqlError( + self.get_client_id(), + _SqlErrorCode.CONNECTION_PROBLEM, + "Cluster topology changed while a query was executed: Member cannot be reached: %s" + % connection.remote_address, + error, + ) + + if isinstance(error, HazelcastSqlError): + return error + + return HazelcastSqlError( + self.get_client_id(), _SqlErrorCode.GENERIC, try_to_get_error_message(error), error + ) + + async def close(self, connection, query_id) -> None: + """Closes the remote query cursor. + + Args: + connection (hazelcast.connection.Connection): Connection + that the first execute request, hence the close request + must route to. + query_id (_SqlQueryId): The query id to close. + + Returns: + None + """ + request = sql_close_codec.encode_request(query_id) + invocation = Invocation(request, connection=connection) + await self._invocation_service.ainvoke(invocation) + + def _to_object(self, data): + try: + return self._serialization_service.to_object(data) + except SchemaNotFoundError as e: + raise e + except Exception as e: + raise HazelcastSqlError( + self.get_client_id(), + _SqlErrorCode.GENERIC, + "Failed to deserialize query result value: %s" % try_to_get_error_message(e), + e, + ) + + def _get_query_connection(self): + try: + connection = self._connection_manager.get_random_connection_for_sql() + except Exception as e: + raise self.re_raise(e, None) + + if not connection: + raise HazelcastSqlError( + self.get_client_id(), + _SqlErrorCode.CONNECTION_PROBLEM, + "Client is not connected", + None, + ) + + return connection + + def _handle_execute_response(self, response): + response_error = response["error"] + if response_error: + # There is a server-side error sent to the client. + sql_error = HazelcastSqlError( + response_error.originating_member_uuid, + response_error.code, + response_error.message, + None, + response_error.suggestion, + ) + raise sql_error + + row_metadata = response["row_metadata"] + if row_metadata is not None: + # The result contains some rows, not an update count. + row_metadata = SqlRowMetadata(row_metadata) + # Set the update count to -1. + return _ExecuteResponse(row_metadata, response["row_page"], -1) + + # Result only contains the update count. + return _ExecuteResponse(None, None, response["update_count"]) diff --git a/tests/integration/asyncio/sql_test.py b/tests/integration/asyncio/sql_test.py new file mode 100644 index 0000000000..514a7cda60 --- /dev/null +++ b/tests/integration/asyncio/sql_test.py @@ -0,0 +1,795 @@ +import asyncio +import datetime +import decimal +import math +import types +import unittest +from unittest.mock import patch + +from hazelcast.core import HazelcastJsonValue +from hazelcast.internal.asyncio_client import HazelcastClient +from hazelcast.sql import HazelcastSqlError, SqlExpectedResultType, SqlColumnType +from tests.hzrc.ttypes import Lang +from tests.integration.asyncio.base import SingleMemberTestCase, HazelcastTestCase +from tests.integration.backward_compatible.sql_test import Student, LITE_MEMBER_CONFIG +from tests.util import random_string + + +SERVER_CONFIG = """ + + + + com.hazelcast.client.test.PortableFactory + + + %s + +""" + +JET_ENABLED_CONFIG = """ + +""" + + +class SqlTestBase(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cluster_config = SERVER_CONFIG % JET_ENABLED_CONFIG + cls.cluster = cls.create_cluster(cls.rc, cluster_config) + cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.client = await HazelcastClient.create_and_start( + cluster_name=self.cluster.id, portable_factories={666: {6: Student}} + ) + self.map_name = random_string() + self.map = await self.client.get_map(self.map_name) + + async def asyncTearDown(self): + await self.map.clear() + await self.shutdown_all_clients() + await super().asyncTearDown() + + async def _populate_map(self, entry_count=10, value_factory=lambda v: v): + entries = {i: value_factory(i) for i in range(entry_count)} + await self.map.put_all(entries) + + async def _create_mapping(self, value_format="INTEGER"): + create_mapping_query = """ + CREATE MAPPING "%s" ( + __key INT, + this %s + ) + TYPE IMaP + OPTIONS ( + 'keyFormat' = 'int', + 'valueFormat' = '%s' + ) + """ % ( + self.map_name, + value_format, + value_format.lower(), + ) + + await self.execute(create_mapping_query) + + async def _create_mapping_for_portable(self, factory_id, class_id, columns): + create_mapping_query = """ + CREATE MAPPING "%s" ( + __key INT%s + %s + ) + TYPE IMaP + OPTIONS ( + 'keyFormat' = 'int', + 'valueFormat' = 'portable', + 'valuePortableFactoryId' = '%s', + 'valuePortableClassId' = '%s' + ) + """ % ( + self.map_name, + "," if len(columns) > 0 else "", + ",\n".join(["%s %s" % (c_name, c_type) for c_name, c_type in columns.items()]), + factory_id, + class_id, + ) + await self.execute(create_mapping_query) + + async def execute(self, query, *args): + return await self.client.sql.execute(query, *args) + + async def execute_statement(self, query, *args, **kwargs): + return await self.client.sql.execute(query, *args, **kwargs) + + def update_count(self, result): + return result.update_count() + + def is_row_set(self, result): + return result.is_row_set() + + def get_row_metadata(self, result): + return result.get_row_metadata() + + +class SqlServiceTest(SqlTestBase): + async def test_execute(self): + await self._create_mapping() + entry_count = 11 + await self._populate_map(entry_count) + result = await self.execute('SELECT * FROM "%s"' % self.map_name) + self.assertCountEqual( + [(i, i) for i in range(entry_count)], + [(row.get_object("__key"), row.get_object("this")) async for row in result], + ) + + async def test_execute_with_params(self): + await self._create_mapping() + entry_count = 13 + await self._populate_map(entry_count) + result = await self.execute( + 'SELECT this FROM "%s" WHERE __key > ? AND this > ?' % self.map_name, 5, 6 + ) + self.assertCountEqual( + [i for i in range(7, entry_count)], + [row.get_object("this") async for row in result], + ) + + async def test_execute_with_mismatched_params_when_sql_has_more(self): + await self._create_mapping() + await self._populate_map() + with self.assertRaises(HazelcastSqlError): + result = await self.execute( + 'SELECT * FROM "%s" WHERE __key > ? AND this > ?' % self.map_name, 5 + ) + async for _ in result: + pass + + async def test_execute_with_mismatched_params_when_params_has_more(self): + await self._create_mapping() + await self._populate_map() + with self.assertRaises(HazelcastSqlError): + result = await self.execute('SELECT * FROM "%s" WHERE this > ?' % self.map_name, 5, 6) + async for _ in result: + pass + + async def test_execute_statement(self): + await self._create_mapping("VARCHAR") + entry_count = 12 + await self._populate_map(entry_count, str) + result = await self.execute_statement('SELECT this FROM "%s"' % self.map_name) + self.assertCountEqual( + [str(i) for i in range(entry_count)], + [row.get_object_with_index(0) async for row in result], + ) + + async def test_execute_statement_with_params(self): + await self._create_mapping_for_portable(666, 6, {"age": "BIGINT", "height": "REAL"}) + entry_count = 20 + await self._populate_map(entry_count, lambda v: Student(v, v)) + result = await self.execute_statement( + 'SELECT age FROM "%s" WHERE height = CAST(? AS REAL)' % self.map_name, + 13.0, + ) + self.assertCountEqual([13], [row.get_object("age") async for row in result]) + + async def test_execute_statement_with_mismatched_params_when_sql_has_more(self): + await self._create_mapping() + await self._populate_map() + with self.assertRaises(HazelcastSqlError): + result = await self.execute_statement( + 'SELECT * FROM "%s" WHERE __key > ? AND this > ?' % self.map_name, + 5, + ) + async for _ in result: + pass + + async def test_execute_statement_with_mismatched_params_when_params_has_more(self): + await self._create_mapping() + await self._populate_map() + with self.assertRaises(HazelcastSqlError): + result = await self.execute_statement( + 'SELECT * FROM "%s" WHERE this > ?' % self.map_name, + 5, + 6, + ) + async for _ in result: + pass + + async def test_execute_statement_with_timeout(self): + await self._create_mapping_for_portable(666, 6, {"age": "BIGINT", "height": "REAL"}) + entry_count = 100 + await self._populate_map(entry_count, lambda v: Student(v, v)) + result = await self.execute_statement( + 'SELECT age FROM "%s" WHERE height < 10' % self.map_name, + timeout=100, + ) + self.assertCountEqual( + [i for i in range(10)], [row.get_object("age") async for row in result] + ) + + async def test_execute_statement_with_cursor_buffer_size(self): + await self._create_mapping_for_portable(666, 6, {"age": "BIGINT", "height": "REAL"}) + entry_count = 50 + await self._populate_map(entry_count, lambda v: Student(v, v)) + result = await self.execute_statement( + 'SELECT age FROM "%s"' % self.map_name, + cursor_buffer_size=3, + ) + with patch.object(result, "_fetch_next_page", wraps=result._fetch_next_page) as patched: + self.assertCountEqual( + [i for i in range(entry_count)], [row.get_object("age") async for row in result] + ) + # -1 comes from the fact that, we don't fetch the first page. + expected = math.ceil(entry_count / 3.0) - 1 + actual = patched.call_count + self.assertEqual(expected, actual) + + # Can't test the case we would expect an update count, because the IMDG SQL + # engine does not support such query as of now. + async def test_execute_statement_with_expected_result_type_of_rows_when_rows_are_expected(self): + await self._create_mapping_for_portable(666, 6, {"age": "BIGINT", "height": "REAL"}) + entry_count = 100 + await self._populate_map(entry_count, lambda v: Student(v, v)) + result = await self.execute_statement( + 'SELECT age FROM "%s" WHERE age < 3' % self.map_name, + expected_result_type=SqlExpectedResultType.ROWS, + ) + + self.assertCountEqual( + [i for i in range(3)], [row.get_object("age") async for row in result] + ) + + # Can't test the case we would expect an update count, because the IMDG SQL + # engine does not support such query as of now. + async def test_execute_statement_with_expected_result_type_of_update_count_when_rows_are_expected( + self, + ): + await self._create_mapping() + await self._populate_map() + + with self.assertRaises(HazelcastSqlError): + result = await self.execute_statement( + 'SELECT * FROM "%s"' % self.map_name, + expected_result_type=SqlExpectedResultType.UPDATE_COUNT, + ) + async for _ in result: + pass + + # Can't test the schema, because the IMDG SQL engine does not support + # specifying a schema yet. + async def test_provided_suggestions(self): + # We don't create a mapping intentionally to get suggestions + await self.map.put(1, "value-1") + select_all_query = 'SELECT * FROM "%s"' % self.map_name + with self.assertRaises(HazelcastSqlError) as cm: + await self.execute(select_all_query) + + await self.execute(cm.exception.suggestion) + async with await self.execute(select_all_query) as result: + self.assertEqual( + [(1, "value-1")], + [(r.get_object("__key"), r.get_object("this")) async for r in result], + ) + + +class SqlResultTest(SqlTestBase): + async def test_blocking_iterator(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT __key FROM "%s"' % self.map_name) + self.assertCountEqual( + [i for i in range(10)], [row.get_object_with_index(0) async for row in result] + ) + + async def test_blocking_iterator_when_iterator_requested_more_than_once(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT this FROM "%s"' % self.map_name) + + self.assertCountEqual( + [i for i in range(10)], [row.get_object_with_index(0) async for row in result] + ) + with self.assertRaises(ValueError): + async for _ in result: + pass + + async def test_blocking_iterator_with_multi_paged_result(self): + await self._create_mapping() + await self._populate_map() + # Each page will contain just 1 result + result = await self.execute_statement( + 'SELECT __key FROM "%s"' % self.map_name, + cursor_buffer_size=1, + ) + + self.assertCountEqual( + [i for i in range(10)], [row.get_object_with_index(0) async for row in result] + ) + + async def test_iterator(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT __key FROM "%s"' % self.map_name) + iterator = result.iterator() + rows = [] + async for row in iterator: + rows.append(row.get_object_with_index(0)) + + def assertion(): + self.assertCountEqual( + [i for i in range(10)], + rows, + ) + + await self.assertTrueEventually(assertion) + + async def test_iterator_when_iterator_requested_more_than_once(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT this FROM "%s"' % self.map_name) + iterator = result.iterator() + rows = [] + async for row in iterator: + rows.append(row.get_object("this")) + + self.assertCountEqual([i for i in range(10)], rows) + with self.assertRaises(ValueError): + result.iterator() + + async def test_iterator_with_multi_paged_result(self): + await self._create_mapping() + await self._populate_map() + # Each page will contain just 1 result + result = await self.execute_statement( + 'SELECT __key FROM "%s"' % self.map_name, + cursor_buffer_size=1, + ) + iterator = result.iterator() + rows = [] + async for row in iterator: + rows.append(row.get_object_with_index(0)) + + self.assertCountEqual([i for i in range(10)], rows) + + async def test_request_blocking_iterator_after_iterator(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT * FROM "%s"' % self.map_name) + _ = result.iterator() + with self.assertRaises(ValueError): + async for _ in result: + pass + + async def test_request_iterator_after_blocking_iterator(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT * FROM "%s"' % self.map_name) + async for _ in result: + pass + + with self.assertRaises(ValueError): + result.iterator() + + # Can't test the case we would expect row to be not set, because the IMDG SQL + # engine does not support update/insert queries now. + async def test_is_row_set_when_row_is_set(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT * FROM "%s"' % self.map_name) + self.assertTrue(self.is_row_set(result)) + + # Can't test the case we would expect a non-negative updated count, because the IMDG SQL + # engine does not support update/insert queries now. + async def test_update_count_when_there_is_no_update(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT * FROM "%s" WHERE __key > 5' % self.map_name) + self.assertEqual(-1, self.update_count(result)) + + async def test_get_row_metadata(self): + await self._create_mapping("VARCHAR") + await self._populate_map(value_factory=str) + result = await self.execute('SELECT __key, this FROM "%s"' % self.map_name) + row_metadata = self.get_row_metadata(result) + self.assertEqual(2, row_metadata.column_count) + columns = row_metadata.columns + self.assertEqual(SqlColumnType.INTEGER, columns[0].type) + self.assertEqual(SqlColumnType.VARCHAR, columns[1].type) + self.assertTrue(columns[0].nullable) + self.assertTrue(columns[1].nullable) + + async def test_close_after_query_execution(self): + await self._create_mapping() + await self._populate_map() + result = await self.execute('SELECT * FROM "%s"' % self.map_name) + async for _ in result: + pass + + self.assertIsNone(await result.close()) + + async def test_close_when_query_is_active(self): + await self._create_mapping() + await self._populate_map() + # Each page will contain 1 row + result = await self.execute_statement( + 'SELECT * FROM "%s"' % self.map_name, + cursor_buffer_size=1, + ) + # Fetch couple of pages + iterator = result.iterator() + await iterator.__anext__() + self.assertIsNone(await result.close()) + with self.assertRaises(HazelcastSqlError): + # Next fetch requests should fail + await iterator.__anext__() + + async def test_with_statement(self): + await self._create_mapping() + await self._populate_map() + async with await self.execute('SELECT this FROM "%s"' % self.map_name) as result: + self.assertCountEqual( + [i for i in range(10)], [row.get_object_with_index(0) async for row in result] + ) + + async def test_with_statement_when_iteration_throws(self): + await self._create_mapping() + await self._populate_map() + with self.assertRaises(RuntimeError): + async with await self.execute_statement( + 'SELECT this FROM "%s"' % self.map_name, + cursor_buffer_size=1, # so that it doesn't close immediately + ) as result: + async for _ in result: + raise RuntimeError("expected") + + res = await result.close() + self.assertEqual(res, None) + + async def test_deserialization_error(self): + # Using a Portable that is not defined on the client-side. + await self._create_mapping_for_portable(666, 1, {}) + + script = ( + """ + var m = instance_0.getMap("%s"); + m.put(1, new com.hazelcast.client.test.Employee(1, "Joe")); + """ + % self.map_name + ) + res = await asyncio.get_running_loop().run_in_executor( + None, self.rc.executeOnController, self.cluster.id, script, Lang.JAVASCRIPT + ) + self.assertTrue(res.success) + with self.assertRaisesRegex(HazelcastSqlError, "Failed to deserialize query result value"): + await self.execute('SELECT __key, this FROM "%s"' % self.map_name) + + async def test_rows_as_dict_or_list(self): + await self._create_mapping("VARCHAR") + entry_count = 20 + + def value_factory(v): + return "value-%s" % v + + await self._populate_map(entry_count, value_factory) + expected = [(i, value_factory(i)) for i in range(entry_count)] + async with await self.execute('SELECT __key, this FROM "%s"' % self.map_name) as result: + # Verify that both row[integer] and row[string] works + self.assertCountEqual(expected, [(row[0], row["this"]) async for row in result]) + + +class SqlColumnTypesReadTest(SqlTestBase): + async def test_varchar(self): + def value_factory(key): + return "val-%s" % key + + await self._create_mapping("VARCHAR") + await self._populate_map(value_factory=value_factory) + await self._validate_rows(SqlColumnType.VARCHAR, value_factory) + + async def test_boolean(self): + def value_factory(key): + return key % 2 == 0 + + await self._create_mapping("BOOLEAN") + await self._populate_map(value_factory=value_factory) + await self._validate_rows(SqlColumnType.BOOLEAN, value_factory) + + async def _validate_rows(self, expected_type, value_factory=lambda key: key): + result = await self.execute('SELECT __key, this FROM "%s"' % self.map_name) + await self._validate_result(result, expected_type, value_factory) + + async def test_tiny_int(self): + await self._create_mapping("TINYINT") + await self._populate_map_via_rc("new java.lang.Byte(key)") + await self._validate_rows(SqlColumnType.TINYINT) + + async def test_small_int(self): + await self._create_mapping("SMALLINT") + await self._populate_map_via_rc("new java.lang.Short(key)") + await self._validate_rows(SqlColumnType.SMALLINT) + + async def test_integer(self): + await self._create_mapping("INTEGER") + await self._populate_map_via_rc("new java.lang.Integer(key)") + await self._validate_rows(SqlColumnType.INTEGER) + + async def test_big_int(self): + await self._create_mapping("BIGINT") + await self._populate_map_via_rc("new java.lang.Long(key)") + await self._validate_rows(SqlColumnType.BIGINT) + + async def test_real(self): + await self._create_mapping("REAL") + await self._populate_map_via_rc("new java.lang.Float(key * 1.0 / 8)") + await self._validate_rows(SqlColumnType.REAL, lambda x: x * 1.0 / 8) + + async def test_double(self): + await self._create_mapping("DOUBLE") + await self._populate_map_via_rc("new java.lang.Double(key * 1.0 / 1.1)") + await self._validate_rows(SqlColumnType.DOUBLE, lambda x: x * 1.0 / 1.1) + + async def test_date(self): + def value_factory(key): + return datetime.date(key + 2000, key + 1, key + 1) + + await self._create_mapping("DATE") + await self._populate_map_via_rc("java.time.LocalDate.of(key + 2000, key + 1, key + 1)") + await self._validate_rows(SqlColumnType.DATE, value_factory) + + async def test_time(self): + def value_factory(key): + return datetime.time(key, key, key, key) + + await self._create_mapping("TIME") + await self._populate_map_via_rc("java.time.LocalTime.of(key, key, key, key * 1000)") + await self._validate_rows(SqlColumnType.TIME, value_factory) + + async def test_timestamp(self): + def value_factory(key): + return datetime.datetime(key + 2000, key + 1, key + 1, key, key, key, key) + + await self._create_mapping("TIMESTAMP") + await self._populate_map_via_rc( + "java.time.LocalDateTime.of(key + 2000, key + 1, key + 1, key, key, key, key * 1000)" + ) + await self._validate_rows(SqlColumnType.TIMESTAMP, value_factory) + + async def test_timestamp_with_time_zone(self): + def value_factory(key): + return datetime.datetime( + key + 2000, + key + 1, + key + 1, + key, + key, + key, + key, + datetime.timezone(datetime.timedelta(hours=key)), + ) + + await self._create_mapping("TIMESTAMP WITH TIME ZONE") + await self._populate_map_via_rc( + "java.time.OffsetDateTime.of(key + 2000, key + 1, key + 1, key, key, key, key * 1000, " + "java.time.ZoneOffset.ofHours(key))" + ) + await self._validate_rows(SqlColumnType.TIMESTAMP_WITH_TIME_ZONE, value_factory) + + async def test_decimal(self): + def value_factory(key): + return decimal.Decimal((0, (key,), -1 * key)) + + await self._create_mapping("DECIMAL") + await self._populate_map_via_rc("java.math.BigDecimal.valueOf(key, key)") + await self._validate_rows(SqlColumnType.DECIMAL, value_factory) + + async def test_null(self): + await self._create_mapping("INTEGER") + await self._populate_map() + result = await self.execute('SELECT __key, NULL AS this FROM "%s"' % self.map_name) + await self._validate_result(result, SqlColumnType.NULL, lambda _: None) + + async def test_object(self): + def value_factory(key): + return Student(key, key) + + await self._create_mapping_for_portable(666, 6, {"age": "BIGINT", "height": "REAL"}) + await self._populate_map(value_factory=value_factory) + result = await self.execute('SELECT __key, this FROM "%s"' % self.map_name) + await self._validate_result(result, SqlColumnType.OBJECT, value_factory) + + async def test_null_only_column(self): + await self._create_mapping("INTEGER") + await self._populate_map() + result = await self.execute( + 'SELECT __key, CAST(NULL AS INTEGER) as this FROM "%s"' % self.map_name + ) + await self._validate_result(result, SqlColumnType.INTEGER, lambda _: None) + + async def test_json(self): + def value_factory(key): + return HazelcastJsonValue({"key": key}) + + await self._create_mapping("JSON") + await self._populate_map(value_factory=value_factory) + result = await self.execute(f'SELECT __key, this FROM "{self.map_name}"') + await self._validate_result(result, SqlColumnType.JSON, value_factory) + + async def _validate_result(self, result, expected_type, factory): + async for row in result: + key = row.get_object("__key") + expected_value = factory(key) + row_metadata = row.metadata + self.assertEqual(2, row_metadata.column_count) + column_metadata = row_metadata.get_column(1) + self.assertEqual(expected_type, column_metadata.type) + self.assertEqual(expected_value, row.get_object("this")) + + async def _populate_map_via_rc(self, new_object_literal): + script = """ + var map = instance_0.getMap("%s"); + for (var key = 0; key < 10; key++) { + map.set(new java.lang.Integer(key), %s); + } + """ % ( + self.map_name, + new_object_literal, + ) + response = await asyncio.get_running_loop().run_in_executor( + None, self.rc.executeOnController, self.cluster.id, script, Lang.JAVASCRIPT + ) + self.assertTrue(response.success) + + +class SqlServiceV5LiteMemberClusterTest(SingleMemberTestCase): + @classmethod + def configure_cluster(cls): + return LITE_MEMBER_CONFIG % JET_ENABLED_CONFIG + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def test_execute(self): + with self.assertRaises(HazelcastSqlError) as cm: + async with await self.client.sql.execute("SOME QUERY") as result: + result.update_count() + + # Make sure that exception is originating from the server + self.assertEqual(self.member.uuid, str(cm.exception.originating_member_uuid)) + + +class SqlServiceV5MixedClusterTest(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + + # rc = None + # cluster = None + # is_v5_or_newer_server = None + # client = None + # + # @classmethod + # def setUpClass(cls): + # cls.rc = cls.create_rc() + # cls.is_v5_or_newer_server = compare_server_version_with_rc(cls.rc, "5.0") >= 0 + # + # cluster_config = ( + # LITE_MEMBER_CONFIG % JET_ENABLED_CONFIG + # if cls.is_v5_or_newer_server + # else LITE_MEMBER_CONFIG % "" + # ) + # cls.cluster = cls.create_cluster(cls.rc, cluster_config) + # cls.cluster.start_member() + # cls.cluster.start_member() + # + # script = """instance_0.getCluster().promoteLocalLiteMember();""" + # cls.rc.executeOnController(cls.cluster.id, script, Lang.JAVASCRIPT) + # + # cls.client = HazelcastClient(cluster_name=cls.cluster.id) + + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cluster_config = SERVER_CONFIG % JET_ENABLED_CONFIG + cls.cluster = cls.create_cluster(cls.rc, cluster_config) + cls.cluster.start_member() + cls.cluster.start_member() + script = """instance_0.getCluster().promoteLocalLiteMember();""" + cls.rc.executeOnController(cls.cluster.id, script, Lang.JAVASCRIPT) + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + async def asyncSetUp(self): + await super().asyncSetUp() + self.client = await HazelcastClient.create_and_start( + cluster_name=self.cluster.id, portable_factories={666: {6: Student}} + ) + self.map_name = random_string() + self.map = await self.client.get_map(self.map_name) + + async def asyncTearDown(self): + await self.map.clear() + await self.shutdown_all_clients() + await super().asyncTearDown() + + async def test_mixed_cluster(self): + map_name = random_string() + + create_mapping_query = ( + """ + CREATE MAPPING "%s" ( + __key INT, + this INT + ) + TYPE IMaP + OPTIONS ( + 'keyFormat' = 'int', + 'valueFormat' = 'int' + ) + """ + % map_name + ) + + await self.client.sql.execute(create_mapping_query) + m = await self.client.get_map(map_name) + await m.put(1, 1) + async with await self.client.sql.execute('SELECT this FROM "%s"' % map_name) as result: + rows = [row.get_object("this") async for row in result] + + self.assertEqual(1, len(rows)) + self.assertEqual(1, rows[0]) + + +class JetSqlTest(SqlTestBase): + async def test_streaming_sql_query(self): + async with await self.execute("SELECT * FROM TABLE(generate_stream(100))") as result: + idx = 0 + async for row in result: + self.assertEqual(idx, row.get_object("v")) + if idx == 200: + break + idx += 1 + + async def test_federated_query(self): + query = ( + """ + CREATE MAPPING "%s" ( + __key INT, + name VARCHAR, + age INT + ) + TYPE IMap + OPTIONS ( + 'keyFormat' = 'int', + 'valueFormat' = 'json-flat' + ) + """ + % self.map_name + ) + + await self.execute(query) + insert_into_query = ( + """ + INSERT INTO "%s" (__key, name, age) + VALUES (1, 'John', 42) + """ + % self.map_name + ) + + async with await self.execute(insert_into_query) as result: + self.assertEqual(0, self.update_count(result)) + + self.assertEqual(1, await self.map.size()) + entry = await self.map.get(1) + self.assertEqual({"name": "John", "age": 42}, entry.loads()) diff --git a/tests/integration/backward_compatible/sql_test.py b/tests/integration/backward_compatible/sql_test.py index 2911690896..9b6db37a80 100644 --- a/tests/integration/backward_compatible/sql_test.py +++ b/tests/integration/backward_compatible/sql_test.py @@ -828,7 +828,7 @@ def value_factory(key): self._create_mapping("JSON") self._populate_map(value_factory=value_factory) - result = self.execute(f"SELECT __key, this FROM {self.map_name}") + result = self.execute(f'SELECT __key, this FROM "{self.map_name}"') self._validate_result(result, SqlColumnType.JSON, value_factory) def _validate_rows(self, expected_type, value_factory=lambda key: key): @@ -974,7 +974,7 @@ def test_mixed_cluster(self): create_mapping_query = ( """ - CREATE MAPPING %s ( + CREATE MAPPING "%s" ( __key INT, this INT ) @@ -991,7 +991,7 @@ def test_mixed_cluster(self): m = self.client.get_map(map_name).blocking() m.put(1, 1) - with self.client.sql.execute("SELECT this FROM %s" % map_name).result() as result: + with self.client.sql.execute('SELECT this FROM "%s"' % map_name).result() as result: rows = [row.get_object("this") for row in result] self.assertEqual(1, len(rows))