Skip to content

Commit c207d65

Browse files
committed
feat(gooddata-sdk): [AUTO] Add AI Lake ObjectStorage listing and ColumnExpression for pipe tables
1 parent 1c4dfe4 commit c207d65

7 files changed

Lines changed: 391 additions & 11 deletions

File tree

packages/gooddata-sdk/src/gooddata_sdk/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
import logging
88

99
from gooddata_sdk._version import __version__
10+
from gooddata_sdk.catalog.ai_lake.entity_model.column_expression import (
11+
CatalogColumnExpression,
12+
ColumnExpressionFunction,
13+
)
14+
from gooddata_sdk.catalog.ai_lake.entity_model.object_storage import CatalogObjectStorageInfo
1015
from gooddata_sdk.catalog.ai_lake.service import (
1116
CatalogAILakeOperation,
1217
CatalogAILakeOperationError,
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# (C) 2026 GoodData Corporation
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# (C) 2026 GoodData Corporation
2+
"""SDK model for AI Lake pipe-table ColumnExpression projections."""
3+
4+
from __future__ import annotations
5+
6+
from typing import Literal
7+
8+
import attrs
9+
from gooddata_api_client.model.column_expression import ColumnExpression
10+
11+
ColumnExpressionFunction = Literal["HLL_HASH", "BITMAP_HASH", "BITMAP_HASH64", "TO_BITMAP"]
12+
"""StarRocks transform functions supported in pipe-table column projection overrides."""
13+
14+
15+
@attrs.define(kw_only=True)
16+
class CatalogColumnExpression:
17+
"""Single column projection override for a pipe table.
18+
19+
Each instance produces ``<function>(<column>) AS <target_column>`` in the
20+
``SELECT`` list of the generated ``CREATE PIPE … AS INSERT`` statement.
21+
Required for AGGREGATE-KEY tables that include native HLL or BITMAP columns
22+
because StarRocks rejects raw VARBINARY values into those column types.
23+
24+
Pass a mapping of ``{target_column: CatalogColumnExpression}`` as the
25+
``column_expressions`` argument to
26+
:py:meth:`~gooddata_sdk.catalog.ai_lake.service.CatalogAILakeService.create_pipe_table`.
27+
28+
Example::
29+
30+
from gooddata_sdk import CatalogColumnExpression
31+
32+
exprs = {
33+
"user_hll": CatalogColumnExpression(column="user_id", function="HLL_HASH"),
34+
"page_bmp": CatalogColumnExpression(column="page_id", function="TO_BITMAP"),
35+
}
36+
"""
37+
38+
column: str
39+
"""Source column produced by parquet schema inference (after ``columnOverrides``)."""
40+
41+
function: ColumnExpressionFunction
42+
"""StarRocks transform to apply to *column* when projecting it."""
43+
44+
def as_api_model(self) -> ColumnExpression:
45+
"""Serialize to the auto-generated ``ColumnExpression`` API model."""
46+
return ColumnExpression(
47+
column=self.column,
48+
function=self.function,
49+
_check_type=False,
50+
)
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# (C) 2026 GoodData Corporation
2+
"""SDK model for AI Lake ObjectStorage descriptors."""
3+
4+
from __future__ import annotations
5+
6+
from typing import Any
7+
8+
import attrs
9+
10+
11+
@attrs.define(kw_only=True)
12+
class CatalogObjectStorageInfo:
13+
"""Descriptor of a registered AI Lake ObjectStorage.
14+
15+
Provider credentials are stripped server-side — only safe descriptors
16+
(id, name, type, and provider-specific metadata like bucket/region) are
17+
returned. Use :attr:`name` as ``source_storage_name`` when calling
18+
:py:meth:`~gooddata_sdk.catalog.ai_lake.service.CatalogAILakeService.create_pipe_table`,
19+
or pass :attr:`storage_id` to the ``storageIds`` list of a
20+
``ProvisionDatabase`` request.
21+
"""
22+
23+
name: str
24+
"""Human-readable name of the storage configuration."""
25+
26+
storage_id: str
27+
"""Stable UUID identifier of the storage configuration."""
28+
29+
storage_type: str
30+
"""Provider type (e.g. ``S3``, ``MINIO``, ``ADLS``)."""
31+
32+
storage_config: dict[str, str] = attrs.field(factory=dict)
33+
"""Provider-specific descriptors (bucket, region, endpoint, …).
34+
35+
Credential references (keys ending in ``_env``) are stripped by the server.
36+
"""
37+
38+
@classmethod
39+
def from_dict(cls, data: dict[str, Any]) -> CatalogObjectStorageInfo:
40+
"""Construct from a snake_case dict as returned by the API client's ``to_dict()``."""
41+
return cls(
42+
name=data["name"],
43+
storage_id=data["storage_id"],
44+
storage_type=data["storage_type"],
45+
storage_config=data.get("storage_config") or {},
46+
)

packages/gooddata-sdk/src/gooddata_sdk/catalog/ai_lake/service.py

Lines changed: 110 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,16 @@
11
# (C) 2026 GoodData Corporation
2-
"""SDK wrapper for the AI Lake long-running-operation surface.
2+
"""SDK wrapper for the AI Lake API surface.
33
4-
Today this exposes only the operations needed by aggregate-aware LDMs:
4+
Currently exposed operations:
55
6-
- `analyze_statistics` triggers `ANALYZE TABLE` over a database instance so
7-
CBO statistics catch up after a schema or data change. Required after
8-
registering a pre-aggregation table whose dim attributes the platform will
9-
later resolve via filter pushdown.
6+
- `list_object_storages` lists ObjectStorages registered for the organization.
7+
Use the returned names as ``source_storage_name`` in `create_pipe_table`.
8+
- `create_pipe_table` registers a pipe table in a database instance, with
9+
optional `CatalogColumnExpression` overrides for HLL / BITMAP columns.
10+
- `analyze_statistics` triggers ``ANALYZE TABLE`` over a database instance so
11+
CBO statistics catch up after a schema or data change.
1012
- `get_operation` and `wait_for_operation` cover the polling side of the
1113
long-running operation contract that `analyze_statistics` returns.
12-
13-
The full AI Lake API surface (database provisioning, pipe-table
14-
registration, service commands) is not yet wrapped here; consumers that
15-
need those should call `client.ai_lake_api.<method>` directly until a
16-
ticket adds typed wrappers.
1714
"""
1815

1916
from __future__ import annotations
@@ -25,7 +22,10 @@
2522
from attrs import define
2623
from gooddata_api_client.api.ai_lake_api import AILakeApi
2724
from gooddata_api_client.model.analyze_statistics_request import AnalyzeStatisticsRequest
25+
from gooddata_api_client.model.create_pipe_table_request import CreatePipeTableRequest
2826

27+
from gooddata_sdk.catalog.ai_lake.entity_model.column_expression import CatalogColumnExpression
28+
from gooddata_sdk.catalog.ai_lake.entity_model.object_storage import CatalogObjectStorageInfo
2929
from gooddata_sdk.catalog.base import Base
3030
from gooddata_sdk.client import GoodDataApiClient
3131

@@ -76,6 +76,105 @@ def __init__(self, api_client: GoodDataApiClient) -> None:
7676
self._client = api_client
7777
self._ai_lake_api: AILakeApi = AILakeApi(api_client._api_client)
7878

79+
# ------------------------------------------------------------------
80+
# ObjectStorage listing
81+
# ------------------------------------------------------------------
82+
83+
def list_object_storages(self) -> list[CatalogObjectStorageInfo]:
84+
"""List ObjectStorages registered for the organization.
85+
86+
Provider credentials are stripped server-side — only safe descriptors
87+
(id, name, type, bucket, region, endpoint, …) are returned.
88+
89+
Use the returned :attr:`~CatalogObjectStorageInfo.name` as
90+
``source_storage_name`` when calling :meth:`create_pipe_table`, or
91+
pass :attr:`~CatalogObjectStorageInfo.storage_id` to the
92+
``ProvisionDatabase`` ``storageIds`` list.
93+
94+
Returns:
95+
List of :class:`CatalogObjectStorageInfo`, ordered by name.
96+
"""
97+
response = self._ai_lake_api.list_ai_lake_object_storages(_check_return_type=False)
98+
data = response.to_dict() if hasattr(response, "to_dict") else dict(response)
99+
return [CatalogObjectStorageInfo.from_dict(s) for s in data.get("storages", [])]
100+
101+
# ------------------------------------------------------------------
102+
# Pipe-table management
103+
# ------------------------------------------------------------------
104+
105+
def create_pipe_table(
106+
self,
107+
instance_id: str,
108+
table_name: str,
109+
source_storage_name: str,
110+
path_prefix: str,
111+
*,
112+
column_expressions: dict[str, CatalogColumnExpression] | None = None,
113+
column_overrides: dict[str, str] | None = None,
114+
aggregation_overrides: dict[str, str] | None = None,
115+
max_varchar_length: int | None = None,
116+
polling_interval_seconds: int | None = None,
117+
table_properties: dict[str, str] | None = None,
118+
) -> None:
119+
"""Register a new pipe table in an AI Lake database instance.
120+
121+
Args:
122+
instance_id: Database instance name (preferred) or UUID.
123+
table_name: OLAP table name. Must match ``^[a-z][a-z0-9_-]{0,62}$``.
124+
source_storage_name: Name of a registered ObjectStorage (use
125+
:meth:`list_object_storages` to discover available names).
126+
path_prefix: Path prefix to the parquet files in the storage
127+
(e.g. ``'my-dataset/year=2024/'``).
128+
column_expressions: Per-target-column projection overrides. Each
129+
key is the target column name; the value is a
130+
:class:`CatalogColumnExpression` that emits
131+
``<function>(<column>) AS <key>`` in the generated
132+
``CREATE PIPE … AS INSERT`` SELECT list. Required for
133+
AGGREGATE-KEY tables that include native HLL or BITMAP columns.
134+
column_overrides: Override inferred column types, e.g.
135+
``{"year": "INT", "event_date": "DATE"}``.
136+
aggregation_overrides: Maps non-key column names to their StarRocks
137+
aggregation function (``SUM``, ``MIN``, ``MAX``, ``REPLACE``,
138+
``HLL_UNION``, ``BITMAP_UNION``, …). Required for every
139+
non-key column when ``key_config`` type is ``'aggregate'``.
140+
max_varchar_length: Cap VARCHAR(N) columns to this length; 0 means
141+
no cap.
142+
polling_interval_seconds: How often (in seconds) the pipe polls for
143+
new files; 0 or ``None`` uses the server default.
144+
table_properties: ``CREATE TABLE PROPERTIES`` key-value pairs.
145+
Defaults to ``{"replication_num": "1"}`` server-side.
146+
"""
147+
kwargs: dict[str, Any] = {}
148+
if column_expressions is not None:
149+
kwargs["column_expressions"] = {k: v.as_api_model() for k, v in column_expressions.items()}
150+
if column_overrides is not None:
151+
kwargs["column_overrides"] = column_overrides
152+
if aggregation_overrides is not None:
153+
kwargs["aggregation_overrides"] = aggregation_overrides
154+
if max_varchar_length is not None:
155+
kwargs["max_varchar_length"] = max_varchar_length
156+
if polling_interval_seconds is not None:
157+
kwargs["polling_interval_seconds"] = polling_interval_seconds
158+
if table_properties is not None:
159+
kwargs["table_properties"] = table_properties
160+
161+
request = CreatePipeTableRequest(
162+
table_name=table_name,
163+
source_storage_name=source_storage_name,
164+
path_prefix=path_prefix,
165+
_check_type=False,
166+
**kwargs,
167+
)
168+
self._ai_lake_api.create_ai_lake_pipe_table(
169+
instance_id,
170+
request,
171+
_check_return_type=False,
172+
)
173+
174+
# ------------------------------------------------------------------
175+
# Statistics
176+
# ------------------------------------------------------------------
177+
79178
def analyze_statistics(
80179
self,
81180
instance_id: str,
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# (C) 2026 GoodData Corporation
2+
"""Integration tests for AI Lake SDK methods backed by VCR cassettes.
3+
4+
These tests exercise the HTTP surface of `CatalogAILakeService`. Each test
5+
needs a cassette recorded against a live stack; the cassette files are created
6+
by the recorder and are not hand-edited.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
from pathlib import Path
12+
13+
from gooddata_sdk import CatalogObjectStorageInfo, GoodDataSdk
14+
from tests_support.vcrpy_utils import get_vcr
15+
16+
gd_vcr = get_vcr()
17+
18+
_current_dir = Path(__file__).parent.absolute()
19+
_fixtures_dir = _current_dir / "fixtures" / "ai_lake"
20+
21+
22+
@gd_vcr.use_cassette(str(_fixtures_dir / "test_list_ai_lake_object_storages.yaml"))
23+
def test_list_ai_lake_object_storages(test_config):
24+
"""List registered AI Lake ObjectStorages and verify the response shape."""
25+
sdk = GoodDataSdk.create(host_=test_config["host"], token_=test_config["token"])
26+
storages = sdk.catalog_ai_lake.list_object_storages()
27+
28+
assert isinstance(storages, list)
29+
for storage in storages:
30+
assert isinstance(storage, CatalogObjectStorageInfo)
31+
assert storage.name, "name must be non-empty"
32+
assert storage.storage_id, "storage_id must be non-empty"
33+
assert storage.storage_type, "storage_type must be non-empty"
34+
assert isinstance(storage.storage_config, dict)

0 commit comments

Comments
 (0)