Skip to content

Commit 0c31a2f

Browse files
committed
Add a refresh_cube_materialization end point and adjust the UI messaging.
1 parent 0a5ea10 commit 0c31a2f

9 files changed

Lines changed: 595 additions & 334 deletions

File tree

datajunction-query/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ dependencies = [
2626
"snowflake-connector-python>=3.3.1",
2727
"pyyaml>=6.0.1",
2828
"trino>=0.324.0",
29-
"psycopg[async,pool]>=3.2.1",
29+
"psycopg[pool]>=3.2.1",
3030
"sqlalchemy>=2.0.34",
3131
"pytest-asyncio>=0.24.0",
3232
"pytest-integration>=0.2.3",
@@ -64,7 +64,7 @@ test = [
6464
"pydruid>=0.6.4",
6565
"typing-extensions>=4.3.0",
6666
"httpx>=0.24.1",
67-
"psycopg[async,pool]>=3.2.1",
67+
"psycopg[pool]>=3.2.1",
6868
"testcontainers[postgres]>=4.8.1",
6969
"types-toml",
7070
]

datajunction-server/datajunction_server/internal/nodes.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,7 @@ async def update_cube_node(
12791279
"""
12801280
node = await Node.get_cube_by_name(session, node_revision.name)
12811281
node_revision = node.current # type: ignore
1282-
minor_changes = has_minor_changes(node_revision, data) or refresh_materialization
1282+
minor_changes = has_minor_changes(node_revision, data)
12831283
old_metrics = [m.name for m in node_revision.cube_metrics()]
12841284
old_dimensions = node_revision.cube_dimensions()
12851285
major_changes = (data.metrics and data.metrics != old_metrics) or (
@@ -1297,6 +1297,52 @@ async def update_cube_node(
12971297
limit=data.limit or None,
12981298
)
12991299
if not major_changes and not minor_changes:
1300+
# If refresh_materialization requested but no other changes, refresh and return
1301+
if refresh_materialization and query_service_client:
1302+
_logger.info(
1303+
"Refreshing materializations for cube=%s without version change",
1304+
node_revision.name,
1305+
)
1306+
# Re-activate deactivated materializations that match the current version
1307+
current_version = node_revision.version
1308+
for mat in node_revision.materializations:
1309+
mat_config = mat.config if isinstance(mat.config, dict) else {}
1310+
mat_version = mat_config.get("cube", {}).get("version", "")
1311+
if mat.deactivated_at and mat_version == current_version:
1312+
_logger.info(
1313+
"Re-activating materialization %s for cube=%s version=%s",
1314+
mat.name,
1315+
node_revision.name,
1316+
current_version,
1317+
)
1318+
mat.deactivated_at = None
1319+
await session.commit()
1320+
1321+
# Serialize active materializations as complete, ready-to-use dicts
1322+
# so dj-query can parse them directly into model inputs without merging.
1323+
# This avoids a callback from query service back to DJ.
1324+
active_mats = []
1325+
for mat in node_revision.materializations:
1326+
if mat.deactivated_at:
1327+
continue
1328+
mat_dict = {
1329+
**(mat.config if isinstance(mat.config, dict) else {}),
1330+
"name": mat.name,
1331+
"job": mat.job,
1332+
"strategy": mat.strategy.value if mat.strategy else None,
1333+
"schedule": mat.schedule,
1334+
"cube": {
1335+
"name": node_revision.name,
1336+
"version": node_revision.version,
1337+
},
1338+
}
1339+
active_mats.append(mat_dict)
1340+
query_service_client.refresh_cube_materialization(
1341+
cube_name=node_revision.name,
1342+
cube_version=node_revision.version,
1343+
materializations=active_mats,
1344+
request_headers=request_headers,
1345+
)
13001346
return None
13011347

13021348
# Disable autoflush to prevent partial state from being persisted if an error

datajunction-server/datajunction_server/query_clients/http.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,21 @@ def deactivate_materialization(
206206
request_headers=request_headers,
207207
)
208208

209+
def refresh_cube_materialization(
210+
self,
211+
cube_name: str,
212+
cube_version: Optional[str] = None,
213+
materializations: Optional[List[Dict[str, Any]]] = None,
214+
request_headers: Optional[Dict[str, str]] = None,
215+
) -> MaterializationInfo:
216+
"""Refresh/rebuild materialization workflows for a cube via HTTP query service."""
217+
return self._client.refresh_cube_materialization(
218+
cube_name=cube_name,
219+
cube_version=cube_version,
220+
materializations=materializations,
221+
request_headers=request_headers,
222+
)
223+
209224
def get_materialization_info(
210225
self,
211226
node_name: str,

datajunction-server/datajunction_server/service_clients.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,74 @@ def deactivate_materialization(
541541
)
542542
return MaterializationInfo(**result)
543543

544+
def refresh_cube_materialization(
545+
self,
546+
cube_name: str,
547+
cube_version: Optional[str] = None,
548+
materializations: Optional[List[Dict]] = None,
549+
request_headers: Optional[Dict[str, str]] = None,
550+
) -> MaterializationInfo:
551+
"""
552+
Refresh/rebuild materialization workflows for a cube without creating a new version.
553+
554+
This calls the query service to recreate the cube's materialization workflows
555+
with the same names, effectively overwriting them in the scheduler.
556+
557+
Args:
558+
cube_name: Name of the cube node
559+
cube_version: Optional cube version (defaults to latest if not specified)
560+
materializations: List of active materialization dicts to rebuild
561+
request_headers: Optional HTTP headers
562+
563+
Returns:
564+
MaterializationInfo with URLs of recreated workflows
565+
"""
566+
refresh_endpoint = f"/cubes/{cube_name}/refresh-materialization"
567+
params = {}
568+
if cube_version:
569+
params["cube_version"] = cube_version
570+
571+
_logger.info(
572+
"[DJQS] Refreshing materializations for cube=%s version=%s",
573+
cube_name,
574+
cube_version or "latest",
575+
)
576+
577+
# Merge request headers with session headers
578+
headers = dict(self.requests_session.headers)
579+
if request_headers:
580+
headers.update(request_headers)
581+
582+
# Build request body with materialization data
583+
body = {}
584+
if materializations is not None:
585+
body["materializations"] = materializations
586+
587+
response = self.requests_session.post(
588+
refresh_endpoint,
589+
headers=headers,
590+
params=params,
591+
json=body,
592+
)
593+
594+
if response.status_code not in (200, 201): # pragma: no cover
595+
_logger.exception(
596+
"[DJQS] Failed to refresh materializations for cube=%s with `POST %s`: %s",
597+
cube_name,
598+
refresh_endpoint,
599+
response.text,
600+
exc_info=True,
601+
)
602+
return MaterializationInfo(urls=[], output_tables=[])
603+
604+
result = response.json()
605+
_logger.info(
606+
"[DJQS] Successfully refreshed materializations for cube=%s with `POST %s`",
607+
cube_name,
608+
refresh_endpoint,
609+
)
610+
return MaterializationInfo(**result)
611+
544612
def get_materialization_info(
545613
self,
546614
node_name: str,

datajunction-server/tests/api/cubes_test.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from datajunction_server.models.query import ColumnMetadata, V3ColumnMetadata
1818
from datajunction_server.service_clients import QueryServiceClient
1919
from datajunction_server.sql.parsing.backends.antlr4 import parse
20-
from datajunction_server.utils import get_query_service_client
20+
from datajunction_server.utils import get_query_service_client, get_session
2121
from tests.sql.utils import compare_query_strings
2222
from tests.construction.build_v3 import assert_sql_equal
2323

@@ -4283,3 +4283,93 @@ async def test_backfill_cube_query_service_failure(
42834283
)
42844284
assert response.status_code == 500
42854285
assert "failed to run cube backfill" in response.json()["message"].lower()
4286+
4287+
4288+
class TestCubeRefreshMaterialization:
4289+
"""Tests for refreshing cube materializations without version bump."""
4290+
4291+
@pytest.mark.asyncio
4292+
async def test_refresh_materialization_no_version_bump(
4293+
self,
4294+
client_with_repairs_cube: AsyncClient,
4295+
mocker,
4296+
):
4297+
"""
4298+
Test that PATCH with refresh_materialization=true and no other changes
4299+
calls refresh_cube_materialization without bumping the version.
4300+
"""
4301+
cube_name = "default.test_refresh_cube"
4302+
await make_a_test_cube(
4303+
client_with_repairs_cube,
4304+
cube_name,
4305+
with_materialization=True,
4306+
)
4307+
4308+
# Get the initial version
4309+
response = await client_with_repairs_cube.get(f"/nodes/{cube_name}/")
4310+
assert response.status_code == 200
4311+
initial_version = response.json()["version"]
4312+
4313+
# Add a deactivated materialization to the node revision so the
4314+
# re-activation logic gets exercised
4315+
from datetime import datetime, timezone
4316+
4317+
from sqlalchemy import select
4318+
4319+
from datajunction_server.database.materialization import Materialization
4320+
from datajunction_server.database.node import NodeRevision
4321+
4322+
session_factory = client_with_repairs_cube.app.dependency_overrides[get_session]
4323+
session = session_factory()
4324+
result = await session.execute(
4325+
select(NodeRevision).where(NodeRevision.name == cube_name),
4326+
)
4327+
node_rev = result.scalars().first()
4328+
deactivated_mat = Materialization(
4329+
node_revision_id=node_rev.id,
4330+
name="deactivated_mat",
4331+
strategy=None,
4332+
schedule="",
4333+
config={"cube": {"version": initial_version}},
4334+
job="DruidCubeMaterializationJob",
4335+
deactivated_at=datetime.now(timezone.utc),
4336+
)
4337+
session.add(deactivated_mat)
4338+
await session.commit()
4339+
4340+
# Mock the refresh method on the query service client
4341+
qs_client = client_with_repairs_cube.app.dependency_overrides[
4342+
get_query_service_client
4343+
]()
4344+
mock_refresh = mocker.patch.object(
4345+
qs_client,
4346+
"refresh_cube_materialization",
4347+
return_value=mocker.MagicMock(urls=["http://workflow/refreshed"]),
4348+
)
4349+
4350+
# PATCH with refresh_materialization=true but no actual changes
4351+
response = await client_with_repairs_cube.patch(
4352+
f"/nodes/{cube_name}/?refresh_materialization=true",
4353+
json={},
4354+
)
4355+
# Should return 200 with no version change (returns None -> no content)
4356+
assert response.status_code == 200
4357+
4358+
# Verify refresh was called with correct args
4359+
mock_refresh.assert_called_once()
4360+
call_kwargs = mock_refresh.call_args[1]
4361+
assert call_kwargs["cube_name"] == cube_name
4362+
assert call_kwargs["cube_version"] == initial_version
4363+
assert isinstance(call_kwargs["materializations"], list)
4364+
assert len(call_kwargs["materializations"]) > 0
4365+
# Deactivated materialization should be re-activated and included
4366+
mat_names = [m["name"] for m in call_kwargs["materializations"]]
4367+
assert "deactivated_mat" in mat_names
4368+
4369+
# Verify the materialization was re-activated in the DB
4370+
await session.refresh(deactivated_mat)
4371+
assert deactivated_mat.deactivated_at is None
4372+
4373+
# Verify version didn't change
4374+
response = await client_with_repairs_cube.get(f"/nodes/{cube_name}/")
4375+
assert response.json()["version"] == initial_version

datajunction-server/tests/query_clients/http_query_client_test.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,27 @@ def test_run_cube_backfill(self, mock_client):
102102
backfill_input=backfill_input,
103103
request_headers={"X-Test": "1"},
104104
)
105+
106+
def test_refresh_cube_materialization(self, mock_client):
107+
"""Test refresh_cube_materialization delegates to underlying client."""
108+
client, mock_inner = mock_client
109+
mock_inner.refresh_cube_materialization.return_value = MagicMock(
110+
urls=["http://wf1"],
111+
)
112+
113+
materializations = [
114+
{"name": "mat1", "job": "DruidCubeMaterializationJob"},
115+
]
116+
client.refresh_cube_materialization(
117+
cube_name="test.cube",
118+
cube_version="v1.0",
119+
materializations=materializations,
120+
request_headers={"X-Test": "1"},
121+
)
122+
123+
mock_inner.refresh_cube_materialization.assert_called_once_with(
124+
cube_name="test.cube",
125+
cube_version="v1.0",
126+
materializations=materializations,
127+
request_headers={"X-Test": "1"},
128+
)

datajunction-server/tests/service_clients_test.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,3 +1181,82 @@ def test_run_cube_backfill_failure(self, mocker: MockerFixture) -> None:
11811181
with pytest.raises(Exception) as exc_info:
11821182
query_service_client.run_cube_backfill(backfill_input)
11831183
assert "Query service error" in str(exc_info.value)
1184+
1185+
def test_refresh_cube_materialization_success(
1186+
self,
1187+
mocker: MockerFixture,
1188+
) -> None:
1189+
"""
1190+
Test successful refresh of cube materializations.
1191+
"""
1192+
mock_response = MagicMock()
1193+
mock_response.status_code = 200
1194+
mock_response.json.return_value = {
1195+
"urls": ["http://workflow1"],
1196+
"output_tables": [],
1197+
}
1198+
1199+
mock_request = mocker.patch(
1200+
"datajunction_server.service_clients.RequestsSessionWithEndpoint.post",
1201+
return_value=mock_response,
1202+
)
1203+
1204+
materializations = [
1205+
{
1206+
"name": "druid_cube__incremental_time__date",
1207+
"job": "DruidCubeMaterializationJob",
1208+
"strategy": "incremental_time",
1209+
"schedule": "@daily",
1210+
"cube": {"name": "test.cube", "version": "v1.0"},
1211+
},
1212+
]
1213+
1214+
query_service_client = QueryServiceClient(uri=self.endpoint)
1215+
result = query_service_client.refresh_cube_materialization(
1216+
cube_name="test.cube",
1217+
cube_version="v1.0",
1218+
materializations=materializations,
1219+
request_headers={"Cookie": "session=abc123"},
1220+
)
1221+
1222+
mock_request.assert_called_once_with(
1223+
"/cubes/test.cube/refresh-materialization",
1224+
headers=ANY,
1225+
params={"cube_version": "v1.0"},
1226+
json={"materializations": materializations},
1227+
)
1228+
assert result == MaterializationInfo(
1229+
urls=["http://workflow1"],
1230+
output_tables=[],
1231+
)
1232+
1233+
def test_refresh_cube_materialization_no_version(
1234+
self,
1235+
mocker: MockerFixture,
1236+
) -> None:
1237+
"""
1238+
Test refresh without specifying a version sends no params.
1239+
"""
1240+
mock_response = MagicMock()
1241+
mock_response.status_code = 200
1242+
mock_response.json.return_value = {
1243+
"urls": [],
1244+
"output_tables": [],
1245+
}
1246+
1247+
mock_request = mocker.patch(
1248+
"datajunction_server.service_clients.RequestsSessionWithEndpoint.post",
1249+
return_value=mock_response,
1250+
)
1251+
1252+
query_service_client = QueryServiceClient(uri=self.endpoint)
1253+
query_service_client.refresh_cube_materialization(
1254+
cube_name="test.cube",
1255+
)
1256+
1257+
mock_request.assert_called_once_with(
1258+
"/cubes/test.cube/refresh-materialization",
1259+
headers=ANY,
1260+
params={},
1261+
json={},
1262+
)

0 commit comments

Comments
 (0)