diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 099043eae0..36b8eaa4e9 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -2960,9 +2960,10 @@ def _create_response_future(self, query, parameters, trace, custom_payload, "2 or higher (supported in Cassandra 2.0 and higher). Consider " "setting Cluster.protocol_version to 2 to support this operation.") statement_keyspace = query.keyspace if ProtocolVersion.uses_keyspace_flag(self._protocol_version) else None + batch_timestamp = query.timestamp if query.timestamp is not None else timestamp message = BatchMessage( query.batch_type, query._statements_and_parameters, cl, - serial_cl, timestamp, statement_keyspace) + serial_cl, batch_timestamp, statement_keyspace) elif isinstance(query, GraphStatement): # the statement_keyspace is not aplicable to GraphStatement message = QueryMessage(query.query, cl, serial_cl, fetch_size, diff --git a/cassandra/cqlengine/connection.py b/cassandra/cqlengine/connection.py index bf3e55a2e8..8d6cf52508 100644 --- a/cassandra/cqlengine/connection.py +++ b/cassandra/cqlengine/connection.py @@ -17,7 +17,7 @@ import threading from cassandra.cluster import Cluster, _ConfigMode, _NOT_SET, NoHostAvailable, UserTypeDoesNotExist, ConsistencyLevel -from cassandra.query import SimpleStatement, dict_factory +from cassandra.query import SimpleStatement, BatchStatement, dict_factory from cassandra.cqlengine import CQLEngineException from cassandra.cqlengine.statements import BaseCQLStatement @@ -340,7 +340,12 @@ def execute(query, params=None, consistency_level=None, timeout=NOT_SET, connect if not conn.session: raise CQLEngineException("It is required to setup() cqlengine before executing queries") - if isinstance(query, SimpleStatement): + if isinstance(query, BatchStatement): + log.debug(format_log_context('Executing BatchStatement with {} statements'.format( + len(query._statements_and_parameters)), connection=connection)) + result = conn.session.execute(query, timeout=timeout) + return result + elif isinstance(query, SimpleStatement): pass # elif isinstance(query, BaseCQLStatement): params = query.get_context() diff --git a/cassandra/cqlengine/query.py b/cassandra/cqlengine/query.py index afc7ceeef6..81c3ff1c10 100644 --- a/cassandra/cqlengine/query.py +++ b/cassandra/cqlengine/query.py @@ -226,36 +226,42 @@ def execute(self): self._execute_callbacks() return - batch_type = None if self.batch_type is CBatchType.LOGGED else self.batch_type - opener = 'BEGIN ' + (str(batch_type) + ' ' if batch_type else '') + ' BATCH' - if self.timestamp: - - if isinstance(self.timestamp, int): - ts = self.timestamp - elif isinstance(self.timestamp, (datetime, timedelta)): - ts = self.timestamp - if isinstance(self.timestamp, timedelta): - ts += datetime.now() # Apply timedelta - ts = int(time.mktime(ts.timetuple()) * 1e+6 + ts.microsecond) - else: - raise ValueError("Batch expects a long, a timedelta, or a datetime") + # Map cqlengine batch_type to core BatchType + if self.batch_type is None or self.batch_type is CBatchType.LOGGED: + batch_type = CBatchType.LOGGED + elif self.batch_type == 'UNLOGGED' or self.batch_type is CBatchType.UNLOGGED: + batch_type = CBatchType.UNLOGGED + elif self.batch_type == 'COUNTER' or self.batch_type is CBatchType.COUNTER: + batch_type = CBatchType.COUNTER + else: + batch_type = CBatchType.LOGGED - opener += ' USING TIMESTAMP {0}'.format(ts) + # Calculate timestamp in microseconds if set + timestamp = None + if self.timestamp: + if isinstance(self.timestamp, timedelta): + ts = datetime.now() + self.timestamp + timestamp = int(time.mktime(ts.timetuple()) * 1e+6 + ts.microsecond) + elif isinstance(self.timestamp, datetime): + timestamp = int(time.mktime(self.timestamp.timetuple()) * 1e+6 + self.timestamp.microsecond) + + # Create BatchStatement + batch = BatchStatement( + batch_type=batch_type, + consistency_level=self._consistency, + timestamp=timestamp + ) - query_list = [opener] - parameters = {} - ctx_counter = 0 + # Add each query as a SimpleStatement with parameters for query in self.queries: - query.update_context_id(ctx_counter) - ctx = query.get_context() - ctx_counter += len(ctx) - query_list.append(' ' + str(query)) - parameters.update(ctx) - - query_list.append('APPLY BATCH;') - - tmp = conn.execute('\n'.join(query_list), parameters, self._consistency, self._timeout, connection=self._connection) - check_applied(tmp) + query.update_context_id(0) # Reset context for each query + params = query.get_context() + stmt = SimpleStatement(str(query)) + batch.add(stmt, params) + + # Execute the batch + result = conn.execute(batch, timeout=self._timeout, connection=self._connection) + check_applied(result) self.queries = [] self._execute_callbacks() diff --git a/cassandra/query.py b/cassandra/query.py index 6c6878fdb4..51628271f8 100644 --- a/cassandra/query.py +++ b/cassandra/query.py @@ -759,13 +759,22 @@ class BatchStatement(Statement): supported when using protocol version 3 or higher. """ + timestamp = None + """ + The optional timestamp for all operations in the batch, in microseconds + since the UNIX epoch. If not set, the client timestamp generator or + server time will be used. + + .. versionadded:: 3.29.2 + """ + _statements_and_parameters = None _session = None _is_lwt = False def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None, consistency_level=None, serial_consistency_level=None, - session=None, custom_payload=None): + session=None, custom_payload=None, timestamp=None): """ `batch_type` specifies The :class:`.BatchType` for the batch operation. Defaults to :attr:`.BatchType.LOGGED`. @@ -781,6 +790,10 @@ def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None, updated with any values found in their custom payloads. These are only allowed when using protocol version 4 or higher. + `timestamp` is an optional timestamp for all operations in the batch, + in microseconds since the UNIX epoch. If set, this will override the + client timestamp generator. + Example usage: .. code-block:: python @@ -809,8 +822,12 @@ def __init__(self, batch_type=BatchType.LOGGED, retry_policy=None, .. versionchanged:: 2.6.0 Added `custom_payload` as a parameter + + .. versionchanged:: 3.29.2 + Added `timestamp` as a parameter """ self.batch_type = batch_type + self.timestamp = timestamp self._statements_and_parameters = [] self._session = session Statement.__init__(self, retry_policy=retry_policy, consistency_level=consistency_level,