This repository was archived by the owner on Apr 1, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 65
feat: Rerouted CheckAndMutateRows and ReadModifyWriteRows #1257
Merged
Merged
Changes from 3 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
41dcace
chore: Rerouted legacy client row filters to data client row filters.
gkevinzheng f6b90f7
Changed attribute mapping + moved ABC classes to test file + changed
gkevinzheng 8cb8af0
feat: Rerouted CheckAndMutateRows and ReadModifyWriteRows
gkevinzheng 36cfaaf
Addressed review feedback + fixed system tests.
gkevinzheng 4d128c6
Merge branch 'v3_staging' into conditional-row
gkevinzheng 1f38119
linting
gkevinzheng e7d7b1e
Eliminated unnecessary lines of code in row.py
gkevinzheng File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,16 +14,13 @@ | |
|
|
||
| """User-friendly container for Google Cloud Bigtable Row.""" | ||
|
|
||
|
|
||
| import struct | ||
|
|
||
| from google.cloud._helpers import _datetime_from_microseconds # type: ignore | ||
| from google.cloud._helpers import _microseconds_from_datetime # type: ignore | ||
| from google.cloud._helpers import _to_bytes # type: ignore | ||
| from google.cloud.bigtable_v2.types import data as data_v2_pb2 | ||
|
|
||
| from google.cloud.bigtable.data import mutations | ||
| from google.cloud.bigtable.data import read_modify_write_rules as rmw_rules | ||
|
|
||
| _PACK_I64 = struct.Struct(">q").pack | ||
|
|
||
| MAX_MUTATIONS = 100000 | ||
| """The maximum number of mutations that a row can accumulate.""" | ||
|
|
@@ -157,26 +154,21 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, state=None) | |
| :param state: (Optional) The state that is passed along to | ||
| :meth:`_get_mutations`. | ||
| """ | ||
| column = _to_bytes(column) | ||
| if isinstance(value, int): | ||
| value = _PACK_I64(value) | ||
| value = _to_bytes(value) | ||
| if timestamp is None: | ||
| # Use -1 for current Bigtable server time. | ||
| timestamp_micros = -1 | ||
| # Use current Bigtable server time. | ||
| timestamp_micros = mutations._SERVER_SIDE_TIMESTAMP | ||
| else: | ||
| timestamp_micros = _microseconds_from_datetime(timestamp) | ||
| # Truncate to millisecond granularity. | ||
| timestamp_micros -= timestamp_micros % 1000 | ||
|
|
||
| mutation_val = data_v2_pb2.Mutation.SetCell( | ||
| family_name=column_family_id, | ||
| column_qualifier=column, | ||
| mutation = mutations.SetCell( | ||
| family=column_family_id, | ||
| qualifier=column, | ||
| new_value=value, | ||
| timestamp_micros=timestamp_micros, | ||
| value=value, | ||
| ) | ||
| mutation_pb = data_v2_pb2.Mutation(set_cell=mutation_val) | ||
| self._get_mutations(state).append(mutation_pb) | ||
| self._get_mutations(state).append(mutation) | ||
|
|
||
| def _delete(self, state=None): | ||
| """Helper for :meth:`delete` | ||
|
|
@@ -191,9 +183,7 @@ def _delete(self, state=None): | |
| :param state: (Optional) The state that is passed along to | ||
| :meth:`_get_mutations`. | ||
| """ | ||
| mutation_val = data_v2_pb2.Mutation.DeleteFromRow() | ||
| mutation_pb = data_v2_pb2.Mutation(delete_from_row=mutation_val) | ||
| self._get_mutations(state).append(mutation_pb) | ||
| self._get_mutations(state).append(mutations.DeleteAllFromRow()) | ||
|
|
||
| def _delete_cells(self, column_family_id, columns, time_range=None, state=None): | ||
| """Helper for :meth:`delete_cell` and :meth:`delete_cells`. | ||
|
|
@@ -222,27 +212,28 @@ def _delete_cells(self, column_family_id, columns, time_range=None, state=None): | |
| """ | ||
| mutations_list = self._get_mutations(state) | ||
| if columns is self.ALL_COLUMNS: | ||
| mutation_val = data_v2_pb2.Mutation.DeleteFromFamily( | ||
| family_name=column_family_id | ||
| self._get_mutations(state).append( | ||
| mutations.DeleteAllFromFamily(family_to_delete=column_family_id) | ||
| ) | ||
| mutation_pb = data_v2_pb2.Mutation(delete_from_family=mutation_val) | ||
| mutations_list.append(mutation_pb) | ||
| else: | ||
| delete_kwargs = {} | ||
| start_timestamp_micros = None | ||
| end_timestamp_micros = None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: these lines aren't needed |
||
| if time_range is not None: | ||
| delete_kwargs["time_range"] = time_range.to_pb() | ||
| timestamps = time_range._to_dict() | ||
| start_timestamp_micros = timestamps.get("start_timestamp_micros") | ||
| end_timestamp_micros = timestamps.get("end_timestamp_micros") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this logic might be a bit cleaner like this: But it's fine as-is too |
||
|
|
||
| to_append = [] | ||
| for column in columns: | ||
| column = _to_bytes(column) | ||
| # time_range will never change if present, but the rest of | ||
| # delete_kwargs will | ||
| delete_kwargs.update( | ||
| family_name=column_family_id, column_qualifier=column | ||
| to_append.append( | ||
| mutations.DeleteRangeFromColumn( | ||
| family=column_family_id, | ||
| qualifier=column, | ||
| start_timestamp_micros=start_timestamp_micros, | ||
| end_timestamp_micros=end_timestamp_micros, | ||
| ) | ||
| ) | ||
| mutation_val = data_v2_pb2.Mutation.DeleteFromColumn(**delete_kwargs) | ||
| mutation_pb = data_v2_pb2.Mutation(delete_from_column=mutation_val) | ||
| to_append.append(mutation_pb) | ||
|
|
||
| # We don't add the mutations until all columns have been | ||
| # processed without error. | ||
|
|
@@ -284,7 +275,7 @@ class DirectRow(_SetDeleteRow): | |
|
|
||
| def __init__(self, row_key, table=None): | ||
| super(DirectRow, self).__init__(row_key, table) | ||
| self._pb_mutations = [] | ||
| self._mutations = [] | ||
|
|
||
| def _get_mutations(self, state=None): # pylint: disable=unused-argument | ||
| """Gets the list of mutations for a given state. | ||
|
|
@@ -299,7 +290,12 @@ def _get_mutations(self, state=None): # pylint: disable=unused-argument | |
| :rtype: list | ||
| :returns: The list to add new mutations to (for the current state). | ||
| """ | ||
| return self._pb_mutations | ||
| return self._mutations | ||
|
|
||
| def _get_mutation_pbs(self): | ||
| """Gets the list of mutation protos.""" | ||
|
|
||
| return [mut._to_pb() for mut in self._get_mutations()] | ||
|
|
||
| def get_mutations_size(self): | ||
| """Gets the total mutations size for current row | ||
|
|
@@ -313,7 +309,7 @@ def get_mutations_size(self): | |
| """ | ||
|
|
||
| mutation_size = 0 | ||
| for mutation in self._get_mutations(): | ||
| for mutation in self._get_mutation_pbs(): | ||
| mutation_size += mutation._pb.ByteSize() | ||
|
|
||
| return mutation_size | ||
|
|
@@ -486,7 +482,7 @@ def clear(self): | |
| :end-before: [END bigtable_api_row_clear] | ||
| :dedent: 4 | ||
| """ | ||
| del self._pb_mutations[:] | ||
| del self._mutations[:] | ||
|
|
||
|
|
||
| class ConditionalRow(_SetDeleteRow): | ||
|
|
@@ -597,17 +593,15 @@ def commit(self): | |
| % (MAX_MUTATIONS, num_true_mutations, num_false_mutations) | ||
| ) | ||
|
|
||
| data_client = self._table._instance._client.table_data_client | ||
| resp = data_client.check_and_mutate_row( | ||
| table_name=self._table.name, | ||
| table = self._table._table_impl | ||
| resp = table.check_and_mutate_row( | ||
| row_key=self._row_key, | ||
| predicate_filter=self._filter.to_pb(), | ||
| app_profile_id=self._table._app_profile_id, | ||
| true_mutations=true_mutations, | ||
| false_mutations=false_mutations, | ||
| predicate=self._filter, | ||
| true_case_mutations=true_mutations, | ||
| false_case_mutations=false_mutations, | ||
| ) | ||
| self.clear() | ||
| return resp.predicate_matched | ||
| return resp | ||
|
|
||
| # pylint: disable=arguments-differ | ||
| def set_cell(self, column_family_id, column, value, timestamp=None, state=True): | ||
|
|
@@ -797,7 +791,7 @@ class AppendRow(Row): | |
|
|
||
| def __init__(self, row_key, table): | ||
| super(AppendRow, self).__init__(row_key, table) | ||
| self._rule_pb_list = [] | ||
| self._rule_list = [] | ||
|
|
||
| def clear(self): | ||
| """Removes all currently accumulated modifications on current row. | ||
|
|
@@ -809,7 +803,7 @@ def clear(self): | |
| :end-before: [END bigtable_api_row_clear] | ||
| :dedent: 4 | ||
| """ | ||
| del self._rule_pb_list[:] | ||
| del self._rule_list[:] | ||
|
|
||
| def append_cell_value(self, column_family_id, column, value): | ||
| """Appends a value to an existing cell. | ||
|
|
@@ -842,12 +836,11 @@ def append_cell_value(self, column_family_id, column, value): | |
| the targeted cell is unset, it will be treated as | ||
| containing the empty string. | ||
| """ | ||
| column = _to_bytes(column) | ||
| value = _to_bytes(value) | ||
| rule_pb = data_v2_pb2.ReadModifyWriteRule( | ||
| family_name=column_family_id, column_qualifier=column, append_value=value | ||
| self._rule_list.append( | ||
| rmw_rules.AppendValueRule( | ||
| family=column_family_id, qualifier=column, append_value=value | ||
| ) | ||
| ) | ||
| self._rule_pb_list.append(rule_pb) | ||
|
|
||
| def increment_cell_value(self, column_family_id, column, int_value): | ||
| """Increments a value in an existing cell. | ||
|
|
@@ -886,13 +879,11 @@ def increment_cell_value(self, column_family_id, column, int_value): | |
| big-endian signed integer), or the entire request | ||
| will fail. | ||
| """ | ||
| column = _to_bytes(column) | ||
| rule_pb = data_v2_pb2.ReadModifyWriteRule( | ||
| family_name=column_family_id, | ||
| column_qualifier=column, | ||
| increment_amount=int_value, | ||
| self._rule_list.append( | ||
| rmw_rules.IncrementRule( | ||
| family=column_family_id, qualifier=column, increment_amount=int_value | ||
| ) | ||
| ) | ||
| self._rule_pb_list.append(rule_pb) | ||
|
|
||
| def commit(self): | ||
| """Makes a ``ReadModifyWriteRow`` API request. | ||
|
|
@@ -925,7 +916,7 @@ def commit(self): | |
| :raises: :class:`ValueError <exceptions.ValueError>` if the number of | ||
| mutations exceeds the :data:`MAX_MUTATIONS`. | ||
| """ | ||
| num_mutations = len(self._rule_pb_list) | ||
| num_mutations = len(self._rule_list) | ||
| if num_mutations == 0: | ||
| return {} | ||
| if num_mutations > MAX_MUTATIONS: | ||
|
|
@@ -934,12 +925,10 @@ def commit(self): | |
| "allowable %d." % (num_mutations, MAX_MUTATIONS) | ||
| ) | ||
|
|
||
| data_client = self._table._instance._client.table_data_client | ||
| row_response = data_client.read_modify_write_row( | ||
| table_name=self._table.name, | ||
| table = self._table._table_impl | ||
| row_response = table.read_modify_write_row( | ||
| row_key=self._row_key, | ||
| rules=self._rule_pb_list, | ||
| app_profile_id=self._table._app_profile_id, | ||
| rules=self._rule_list, | ||
| ) | ||
|
|
||
| # Reset modifications after commit-ing request. | ||
|
|
@@ -983,47 +972,13 @@ def _parse_rmw_row_response(row_response): | |
| } | ||
| """ | ||
| result = {} | ||
| for column_family in row_response.row.families: | ||
| column_family_id, curr_family = _parse_family_pb(column_family) | ||
| result[column_family_id] = curr_family | ||
| for cell in row_response.cells: | ||
| result.setdefault(cell.family, {}).setdefault(cell.qualifier, []).append( | ||
| (cell.value, _datetime_from_microseconds(cell.timestamp_micros)) | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this is a bit complex for a one-liner |
||
| return result | ||
|
|
||
|
|
||
| def _parse_family_pb(family_pb): | ||
| """Parses a Family protobuf into a dictionary. | ||
|
|
||
| :type family_pb: :class:`._generated.data_pb2.Family` | ||
| :param family_pb: A protobuf | ||
|
|
||
| :rtype: tuple | ||
| :returns: A string and dictionary. The string is the name of the | ||
| column family and the dictionary has column names (within the | ||
| family) as keys and cell lists as values. Each cell is | ||
| represented with a two-tuple with the value (in bytes) and the | ||
| timestamp for the cell. For example: | ||
|
|
||
| .. code:: python | ||
|
|
||
| { | ||
| b'col-name1': [ | ||
| (b'cell-val', datetime.datetime(...)), | ||
| (b'cell-val-newer', datetime.datetime(...)), | ||
| ], | ||
| b'col-name2': [ | ||
| (b'altcol-cell-val', datetime.datetime(...)), | ||
| ], | ||
| } | ||
| """ | ||
| result = {} | ||
| for column in family_pb.columns: | ||
| result[column.qualifier] = cells = [] | ||
| for cell in column.cells: | ||
| val_pair = (cell.value, _datetime_from_microseconds(cell.timestamp_micros)) | ||
| cells.append(val_pair) | ||
|
|
||
| return family_pb.name, result | ||
|
|
||
|
|
||
| class PartialRowData(object): | ||
| """Representation of partial row in a Google Cloud Bigtable Table. | ||
|
|
||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like you aren't using mutations_list, but it's still set at the top of the function (and then accessed at the bottom)