Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions samples/snippets/data_client/data_client_snippets_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,43 @@ async def write_conditional(project_id, instance_id, table_id):
await write_conditional(table.client.project, table.instance_id, table.table_id)


async def write_aggregate(table):
# [START bigtable_async_write_aggregate]
from google.cloud.bigtable.data import BigtableDataClientAsync
from google.cloud.bigtable.data.mutations import AddToCell, RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup


async def write_aggregate(project_id, instance_id, table_id):
"""Increments a value in a Bigtable table using AddToCell mutation."""
async with BigtableDataClientAsync(project=project_id) as client:
table = client.get_table(instance_id, table_id)
row_key = "unique_device_ids_1"
try:
async with table.mutations_batcher() as batcher:
# The AddToCell mutation increments the value of a cell.
# The `counters` family must be set up to be an aggregate
# family with an int64 input type.
reading = AddToCell(
family="counters", qualifier="odometer", value=32304
)
Comment thread
daniel-sanche marked this conversation as resolved.
await batcher.append(
RowMutationEntry(row_key.encode("utf-8"), [reading])
)
except MutationsExceptionGroup as e:
# MutationsExceptionGroup contains a FailedMutationEntryError for
# each mutation that failed.
for sub_exception in e.exceptions:
failed_entry: RowMutationEntry = sub_exception.entry
cause: Exception = sub_exception.__cause__
print(
f"Failed mutation for row {failed_entry.row_key!r} with error: {cause!r}"
)


# [END bigtable_async_write_aggregate]
await write_aggregate(table.client.project, table.instance_id, table.table_id)

async def read_row(table):
# [START bigtable_async_reads_row]
from google.cloud.bigtable.data import BigtableDataClientAsync
Expand Down
26 changes: 24 additions & 2 deletions samples/snippets/data_client/data_client_snippets_async_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,27 @@
BIGTABLE_INSTANCE = os.environ["BIGTABLE_INSTANCE"]
TABLE_ID = f"data-client-{str(uuid.uuid4())[:16]}"

@pytest.fixture(scope="session")
def column_family_config(self):
Comment thread
daniel-sanche marked this conversation as resolved.
Outdated
from google.cloud.bigtable_admin_v2 import types

int_aggregate_type = types.Type.Aggregate(
input_type=types.Type(int64_type={"encoding": {"big_endian_bytes": {}}}),
sum={},
)

returb {
Comment thread
daniel-sanche marked this conversation as resolved.
Outdated
"family": types.ColumnFamily(),
"stats_summary": types.ColumnFamily(),
"counters": types.ColumnFamily(
value_type=types.Type(aggregate_type=int_aggregate_type)
),
}

@pytest.fixture(scope="session")
def table_id():
with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, {"family": None, "stats_summary": None}):
def table_id(column_family_config):

with create_table_cm(PROJECT, BIGTABLE_INSTANCE, TABLE_ID, column_family_config):
yield TABLE_ID


Expand Down Expand Up @@ -59,6 +76,11 @@ async def test_write_conditional(table):
await data_snippets.write_conditional(table)


@pytest.mark.asyncio
async def test_write_aggregate(table):
await data_snippets.write_aggregate(table)


@pytest.mark.asyncio
async def test_read_row(table):
await data_snippets.read_row(table)
Expand Down
Loading