diff --git a/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx b/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx
index e8f0197872af6..4a965ac625bec 100644
--- a/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx
+++ b/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx
@@ -18,12 +18,6 @@ Also requires the M [deployment tier](/product/administration/pricing#deployment
-
-
-The DAX API is currently in preview. Please [contact us](https://cube.dev/contact) to enable it for your account.
-
-
-
Read below about the DAX API [configuration](#configuration),
[authentication](#authentication), and [using it](#using-dax-api-with-power-bi) with Power BI.
diff --git a/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx b/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx
index 89a58480552f2..06874010d6cad 100644
--- a/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx
+++ b/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx
@@ -50,13 +50,6 @@ on configuration and features for specific tools:
/>
-
-
-Previously, Semantic Layer Sync provided a way to connect to [Microsoft Power BI][ref-powerbi].
-Currently, it's recommended to use the [DAX API][ref-dax-api].
-
-
-
## Creating syncs
You can create a new sync by navigating to IDE → Integrations
@@ -381,8 +374,6 @@ on, i.e., your development mode branch, shared branch, or main branch.
[ref-data-model]: /product/data-modeling/overview
[ref-sql-api]: /product/apis-integrations/sql-api
-[ref-powerbi]: /product/configuration/visualization-tools/powerbi
-[ref-dax-api]: /product/apis-integrations/dax-api
[ref-config-file]: /product/configuration#configuration-options
[ref-config-contexts]: /product/configuration/reference/config#scheduledrefreshcontexts
[ref-config-schemaversion]: /product/configuration/reference/config#schema_version
diff --git a/docs/content/product/configuration/visualization-tools/metabase.mdx b/docs/content/product/configuration/visualization-tools/metabase.mdx
index b340ab8df6163..ba3783416aebe 100644
--- a/docs/content/product/configuration/visualization-tools/metabase.mdx
+++ b/docs/content/product/configuration/visualization-tools/metabase.mdx
@@ -10,12 +10,9 @@ Webinar recording: [Instant Insights Unlocked with Cube & Metabase](https://cube
-
-
## Connect from Cube Cloud
-It is recommended to use [Semantic Layer Sync][ref-sls] to connect Cube Cloud to
-Metabase. It automatically synchronizes the [data model][ref-data-model] with Metabase.
+You can connect Cube Cloud to Metabase using the [SQL API][ref-sql-api].
Navigate to the [Integrations](/product/workspace/integrations#connect-specific-tools)
page, click Connect to Cube, and choose Metabase to get
@@ -82,6 +79,3 @@ well.
[ref-sql-api]: /product/apis-integrations/sql-api
[metabase-oss]: https://github.com/metabase/metabase
[metabase]: https://www.metabase.com
-[ref-sls]: /product/apis-integrations/semantic-layer-sync
-[ref-sql-api]: /product/apis-integrations/sql-api
-[ref-data-model]: /product/data-modeling/overview
diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts
index 49877b9894807..d5147f27228d0 100644
--- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts
+++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts
@@ -47,6 +47,12 @@ export class LocalCacheDriver implements CacheDriverInterface {
// Nothing to do
}
+ public reset(): void {
+ for (const key of Object.keys(this.store)) {
+ delete this.store[key];
+ }
+ }
+
public async testConnection(): Promise {
// Nothing to do
}
diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
index a33eae2f27799..c4919daa1bb6e 100644
--- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
+++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts
@@ -128,6 +128,7 @@ type CacheEntry = {
time: number;
result: any;
renewalKey: string;
+ requestId?: string;
};
export interface QueryCacheOptions {
@@ -301,6 +302,7 @@ export class QueryCache {
requestId: queryBody.requestId,
dataSource: queryBody.dataSource,
persistent: queryBody.persistent,
+ skipRefreshKeyWaitForRenew: true,
}
);
}
@@ -405,6 +407,12 @@ export class QueryCache {
return key;
}
+ public static extractRequestUUID(requestId: string): string {
+ const idx = requestId.lastIndexOf('-span-');
+
+ return idx !== -1 ? requestId.substring(0, idx) : requestId;
+ }
+
protected static replaceAll(replaceThis, withThis, inThis) {
withThis = withThis.replace(/\$/g, '$$$$');
return inThis.replace(
@@ -903,7 +911,8 @@ export class QueryCache {
const result = {
time: (new Date()).getTime(),
result: res,
- renewalKey
+ renewalKey,
+ requestId: options.requestId,
};
return this
.cacheDriver
@@ -1003,14 +1012,24 @@ export class QueryCache {
primaryQuery,
renewCycle
});
- if (
- renewalKey && (
- !renewalThreshold ||
- !parsedResult.time ||
- renewedAgo > renewalThreshold * 1000 ||
- parsedResult.renewalKey !== renewalKey
- )
- ) {
+
+ const isExpired = !renewalThreshold || !parsedResult.time || renewedAgo > renewalThreshold * 1000;
+ const isKeyMismatch = renewalKey && parsedResult.renewalKey !== renewalKey;
+ const isSameRequest = options.requestId && parsedResult.requestId &&
+ QueryCache.extractRequestUUID(parsedResult.requestId) === QueryCache.extractRequestUUID(options.requestId);
+
+ // Continue-wait cycle: result was produced by our request,
+ // refreshKey changed during execution — return cached, refresh in background.
+ // Skip for renewCycle — it must always fetch fresh data to keep cache up-to-date.
+ if (isSameRequest && !renewCycle && (isExpired || isKeyMismatch)) {
+ this.logger('Same request cache hit (background refresh)', { cacheKey, renewalThreshold, requestId: options.requestId, spanId, primaryQuery, renewCycle });
+ fetchNew().catch(e => {
+ if (!(e instanceof ContinueWaitError)) {
+ this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId, primaryQuery, renewCycle });
+ }
+ });
+ } else if (renewalKey && (isExpired || isKeyMismatch)) {
+ // Cache expired or refreshKey changed — need to refresh
if (options.waitForRenew) {
this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId, spanId, primaryQuery, renewCycle });
return fetchNew();
@@ -1023,6 +1042,7 @@ export class QueryCache {
});
}
}
+
this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle });
if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) {
this.memoryCache.set(redisKey, parsedResult);
diff --git a/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts b/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts
index 3c021ee41cb2b..f8e31b4f4eb97 100644
--- a/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts
+++ b/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts
@@ -1,13 +1,13 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
-/* eslint-disable global-require */
-import R from 'ramda';
import {
BUILD_RANGE_END_LOCAL,
BUILD_RANGE_START_LOCAL,
FROM_PARTITION_RANGE,
TO_PARTITION_RANGE
} from '@cubejs-backend/shared';
-import { PreAggregationPartitionRangeLoader, PreAggregations, version } from '../../src';
+import crypto from 'crypto';
+
+import { PreAggregationPartitionRangeLoader, PreAggregations, QueryCache, LocalCacheDriver, version } from '../../src';
class MockDriver {
public tables: string[] = [];
@@ -123,34 +123,32 @@ describe('PreAggregations', () => {
let mockExternalDriverFactory: (() => Promise) | null = null;
let queryCache: any = null;
- const basicQuery: any = {
+ const defaultCacheKeyQuery: [string, any[], Record] = ['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', [], {
+ renewalThreshold: 10,
+ external: false,
+ }];
+
+ const createBasicQuery = (overrides: Record = {}): any => ({
query: 'SELECT "orders__created_at_week" "orders__created_at_week", sum("orders__count") "orders__count" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20191101) as partition_union WHERE ("orders__created_at_week" >= ($1::timestamptz::timestamptz AT TIME ZONE \'UTC\') AND "orders__created_at_week" <= ($2::timestamptz::timestamptz AT TIME ZONE \'UTC\')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000',
values: ['2019-11-01T00:00:00Z', '2019-11-30T23:59:59Z'],
cacheKeyQueries: {
renewalThreshold: 21600,
- queries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', [], {
- renewalThreshold: 10,
- external: false,
- }]]
+ queries: [defaultCacheKeyQuery]
},
preAggregations: [{
preAggregationsSchema: 'stb_pre_aggregations',
tableName: 'stb_pre_aggregations.orders_number_and_count20191101',
loadSql: ['CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc(\'week\', ("orders".created_at::timestamptz AT TIME ZONE \'UTC\')) "orders__created_at_week", count("orders".id) "orders__count", sum("orders".number) "orders__number"\n FROM\n public.orders AS "orders"\n WHERE ("orders".created_at >= $1::timestamptz AND "orders".created_at <= $2::timestamptz) GROUP BY 1', ['2019-11-01T00:00:00Z', '2019-11-30T23:59:59Z']],
- invalidateKeyQueries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', [], {
- renewalThreshold: 10,
- external: false,
- }]]
+ invalidateKeyQueries: [defaultCacheKeyQuery],
}],
- requestId: 'basic'
- };
+ requestId: 'basic',
+ ...overrides,
+ });
- const basicQueryExternal = R.clone(basicQuery);
- basicQueryExternal.preAggregations[0].external = true;
- const basicQueryWithRenew = R.clone(basicQuery);
- basicQueryWithRenew.renewQuery = true;
- const basicQueryExternalWithRenew = R.clone(basicQueryExternal);
- basicQueryExternalWithRenew.renewQuery = true;
+ const basicQuery: any = createBasicQuery();
+ const basicQueryExternal = createBasicQuery({ preAggregations: [{ ...basicQuery.preAggregations[0], external: true }] });
+ const basicQueryWithRenew = createBasicQuery({ renewQuery: true });
+ const basicQueryExternalWithRenew = createBasicQuery({ preAggregations: [{ ...basicQuery.preAggregations[0], external: true }], renewQuery: true });
beforeEach(() => {
mockDriver = new MockDriver();
@@ -167,11 +165,6 @@ describe('PreAggregations', () => {
return driver;
};
- jest.resetModules();
-
- // Dynamic require after resetModules to ensure fresh module state
- // eslint-disable-next-line @typescript-eslint/no-var-requires
- const { QueryCache } = require('../../src/orchestrator/QueryCache');
queryCache = new QueryCache(
'TEST',
mockDriverFactory as any,
@@ -179,12 +172,15 @@ describe('PreAggregations', () => {
() => {},
{
cacheAndQueueDriver: 'memory',
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
},
);
+
+ // Reset the shared in-memory cache store between tests
+ (queryCache.getCacheDriver() as LocalCacheDriver).reset();
});
describe('loadAllPreAggregationsIfNeeded', () => {
@@ -198,7 +194,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -225,7 +221,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -252,7 +248,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -279,7 +275,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -310,7 +306,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -344,7 +340,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -395,7 +391,7 @@ describe('PreAggregations', () => {
() => {},
queryCache!,
{
- queueOptions: () => ({
+ queueOptions: async () => ({
executionTimeout: 1,
concurrency: 2,
}),
@@ -932,8 +928,6 @@ describe('PreAggregations', () => {
// Old implementation (before the unsigned shift fix)
// This would hang on certain inputs, but for inputs that don't trigger the bug,
// it should produce the same results as the new implementation
- const crypto = require('crypto');
-
function oldVersion(cacheKey: any): string | null {
let result = '';
diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts
index 58403e93f6504..5c77fd3333e8c 100644
--- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts
+++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts
@@ -1,24 +1,34 @@
import crypto from 'crypto';
import { createCancelablePromise, pausePromise } from '@cubejs-backend/shared';
-import { QueryCache, QueryCacheOptions } from '../../src';
+import { CacheKey, CacheKeyItem, QueryCache, QueryCacheOptions } from '../../src';
export type QueryCacheTestOptions = QueryCacheOptions & {
beforeAll?: () => Promise,
afterAll?: () => Promise,
};
+class QueryCacheOpened extends QueryCache {
+ public readonly logger = jest.fn(super.logger);
+}
+
export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) => {
describe(`QueryQueue${name}`, () => {
- const cache = new QueryCache(
+ const cache = new QueryCacheOpened(
crypto.randomBytes(16).toString('hex'),
+ () => {
+ throw new Error('driverFactory is not implemented, mock should be used...');
+ },
jest.fn(() => {
- throw new Error('It`s not implemented mock...');
+ throw new Error('logger is not implemented, mock should be used...');
}),
- jest.fn(),
options,
);
+ beforeEach(() => {
+ cache.logger.mockClear();
+ });
+
beforeAll(async () => {
if (options?.beforeAll) {
await options?.beforeAll();
@@ -110,6 +120,263 @@ export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) =>
expect(callbackWasExecuted).toEqual(true);
});
+ describe('cacheQueryResult renewal logic', () => {
+ const renewalKeyA = QueryCache.queryCacheKey({ query: 'key-a', values: [] });
+ const renewalKeyOld = QueryCache.queryCacheKey({ query: 'key-old', values: [] });
+ const renewalKeyNew = QueryCache.queryCacheKey({ query: 'key-new', values: [] });
+
+ const seedCache = async (cacheKey: CacheKey, entry: CacheKeyItem) => {
+ const redisKey = cache.queryRedisKey(cacheKey);
+ await cache.getCacheDriver().set(redisKey, entry, 3600);
+ };
+
+ const callCacheQueryResult = async (
+ cacheKey,
+ cacheEntry,
+ opts: {
+ renewalThreshold?: number;
+ renewalKey?;
+ waitForRenew?: boolean;
+ requestId?: string;
+ renewCycle?: boolean;
+ }
+ ) => {
+ // cacheQueryResult hashes options.renewalKey via queryRedisKey(),
+ // and fetchNew() stores that hash in the entry. Replicate that for seeding.
+ const seededEntry = {
+ ...cacheEntry,
+ renewalKey: cacheEntry.renewalKey
+ ? cache.queryRedisKey(cacheEntry.renewalKey)
+ : cacheEntry.renewalKey,
+ };
+ await seedCache(cacheKey, seededEntry);
+
+ const fetchNewCalled = { value: false, blocked: false };
+
+ const spy = jest.spyOn(cache, 'queryWithRetryAndRelease').mockImplementation(async () => {
+ fetchNewCalled.value = true;
+ return 'new-result';
+ });
+
+ try {
+ const result = await cache.cacheQueryResult(
+ 'SELECT 1',
+ [],
+ cacheKey,
+ 3600,
+ {
+ renewalThreshold: opts.renewalThreshold ?? 600,
+ renewalKey: opts.renewalKey,
+ waitForRenew: opts.waitForRenew ?? false,
+ requestId: opts.requestId,
+ dataSource: 'default',
+ renewCycle: opts.renewCycle,
+ }
+ );
+
+ fetchNewCalled.blocked = result === 'new-result';
+
+ return { result, fetchNewCalled: fetchNewCalled.value, blocked: fetchNewCalled.blocked };
+ } finally {
+ spy.mockRestore();
+ }
+ };
+
+ it('expired + waitForRenew: blocks on fetchNew', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'expired-wait', values: [] });
+ const entry = {
+ time: Date.now() - 700 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyA,
+ };
+
+ const { result, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyA,
+ waitForRenew: true,
+ requestId: 'req-1',
+ });
+
+ expect(blocked).toBe(true);
+ expect(result).toBe('new-result');
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew');
+ });
+
+ it('expired + no waitForRenew: returns cached, background refresh', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'expired-no-wait', values: [] });
+ const entry = {
+ time: Date.now() - 700 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyA,
+ };
+
+ const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyA,
+ waitForRenew: false,
+ requestId: 'req-2',
+ });
+
+ expect(result).toBe('cached-data');
+ expect(fetchNewCalled).toBe(true);
+ expect(blocked).toBe(false);
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Renewing existing key');
+ });
+
+ it('key mismatch + not expired + waitForRenew: blocks on fetchNew', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'key-mismatch-user', values: [] });
+ const entry = {
+ time: Date.now() - 100 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyOld,
+ };
+
+ const { result, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyNew,
+ waitForRenew: true,
+ renewCycle: false,
+ requestId: 'req-3',
+ });
+
+ expect(blocked).toBe(true);
+ expect(result).toBe('new-result');
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew');
+ });
+
+ it('key mismatch + not expired + renew cycle: blocks on fetchNew', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'key-mismatch-renew', values: [] });
+ const entry = {
+ time: Date.now() - 100 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyOld,
+ };
+
+ const { result, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyNew,
+ waitForRenew: true,
+ renewCycle: true,
+ requestId: 'req-4',
+ });
+
+ expect(blocked).toBe(true);
+ expect(result).toBe('new-result');
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew');
+ });
+
+ it('same request + expired: returns cached, background refresh', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-expired', values: [] });
+ const entry = {
+ time: Date.now() - 700 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyOld,
+ requestId: 'abc-123-span-1',
+ };
+
+ const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyNew,
+ waitForRenew: true,
+ requestId: 'abc-123-span-2',
+ });
+
+ expect(result).toBe('cached-data');
+ expect(fetchNewCalled).toBe(true);
+ expect(blocked).toBe(false);
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Same request cache hit (background refresh)');
+ });
+
+ it('same request + key mismatch only: returns cached, background refresh', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-key-mismatch', values: [] });
+ const entry = {
+ time: Date.now() - 100 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyOld,
+ requestId: 'conn-456-sub-789-span-aaa',
+ };
+
+ const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyNew,
+ waitForRenew: true,
+ requestId: 'conn-456-sub-789-span-bbb',
+ });
+
+ expect(result).toBe('cached-data');
+ expect(fetchNewCalled).toBe(true);
+ expect(blocked).toBe(false);
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Same request cache hit (background refresh)');
+ });
+
+ it('same request + renewCycle + key mismatch: must block on fetchNew (not return stale cache)', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-renew-cycle', values: [] });
+ const entry = {
+ time: Date.now() - 100 * 1000,
+ result: 'stale-data',
+ renewalKey: renewalKeyOld,
+ requestId: 'req-cycle-span-1',
+ };
+
+ const { result, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyNew,
+ waitForRenew: true,
+ renewCycle: true,
+ requestId: 'req-cycle-span-2',
+ });
+
+ // renewCycle must always fetch fresh data even when requestId matches
+ expect(blocked).toBe(true);
+ expect(result).toBe('new-result');
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew');
+ });
+
+ it('same request + renewCycle + expired: must block on fetchNew', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-renew-cycle-expired', values: [] });
+ const entry = {
+ time: Date.now() - 700 * 1000,
+ result: 'stale-data',
+ renewalKey: renewalKeyOld,
+ requestId: 'req-exp-cycle-span-1',
+ };
+
+ const { result, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyNew,
+ waitForRenew: true,
+ renewCycle: true,
+ requestId: 'req-exp-cycle-span-2',
+ });
+
+ expect(blocked).toBe(true);
+ expect(result).toBe('new-result');
+ expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew');
+ });
+
+ it('key matches + not expired: returns cached, no fetchNew', async () => {
+ const cacheKey = QueryCache.queryCacheKey({ query: 'key-match-fresh', values: [] });
+ const entry = {
+ time: Date.now() - 100 * 1000,
+ result: 'cached-data',
+ renewalKey: renewalKeyA,
+ };
+
+ const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, {
+ renewalThreshold: 600,
+ renewalKey: renewalKeyA,
+ waitForRenew: true,
+ requestId: 'req-7',
+ });
+
+ expect(result).toBe('cached-data');
+ expect(fetchNewCalled).toBe(false);
+ expect(blocked).toBe(false);
+ expect(cache.logger.mock.calls.map(c => c[0])).not.toContain('Waiting for renew');
+ expect(cache.logger.mock.calls.map(c => c[0])).not.toContain('Renewing existing key');
+ });
+ });
+
it('queryCacheKey format', () => {
const key1 = QueryCache.queryCacheKey({
query: 'select data',
diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
index 7a5b16fa766b1..f1137a4d607f0 100644
--- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
+++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
@@ -952,6 +952,7 @@ export class BaseQuery {
cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'),
disableExternalPreAggregations: !!this.options.disableExternalPreAggregations,
maskedMembers: this.options.maskedMembers,
+ memberToAlias: this.options.memberToAlias,
};
try {
diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts
index c65bbe81f7d62..010bc1ae59b9d 100644
--- a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts
+++ b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts
@@ -1010,6 +1010,54 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL
visitors__str_case: 'More than 1'
}]));
+ it('simple join member to alias', async () => runQueryTest({
+ measures: [
+ 'visitors.visitor_revenue',
+ 'visitors.visitor_count',
+ 'visitors.per_visitor_revenue'
+ ],
+ dimensions: [
+ 'visitors.source'
+ ],
+ timeDimensions: [{
+ dimension: 'visitors.created_at',
+ dateRange: ['2017-01-01', '2017-01-30'],
+ granularity: 'month'
+ }],
+ timezone: 'America/Los_Angeles',
+ memberToAlias: {
+ 'visitors.visitor_revenue': 'custom_revenue',
+ 'visitors.visitor_count': 'custom_count',
+ 'visitors.source': 'custom_source',
+ 'visitors.created_at': 'custom_created_at',
+ },
+ order: []
+ },
+
+ [
+ {
+ custom_source: 'google',
+ custom_created_at_month: '2017-01-01T00:00:00.000Z',
+ custom_revenue: null,
+ custom_count: '1',
+ visitors__per_visitor_revenue: null
+ },
+ {
+ custom_source: 'some',
+ custom_created_at_month: '2017-01-01T00:00:00.000Z',
+ custom_revenue: '300',
+ custom_count: '2',
+ visitors__per_visitor_revenue: '150'
+ },
+ {
+ custom_source: null,
+ custom_created_at_month: '2017-01-01T00:00:00.000Z',
+ custom_revenue: null,
+ custom_count: '2',
+ visitors__per_visitor_revenue: null
+ }
+ ]));
+
it('running total', async () => {
await compiler.compile();
diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts
index f022048271fe1..55b76da1adb91 100644
--- a/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts
+++ b/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts
@@ -148,3 +148,125 @@ cube(\`C\`, {
c__important_value: '81.5',
}]));
});
+
+describe('Sub Query Dimensions in Filters', () => {
+ jest.setTimeout(200000);
+
+ const { compiler, joinGraph, cubeEvaluator } = prepareJsCompiler(`
+cube(\`Sales\`, {
+ sql: \`
+ SELECT 1 AS id, 10 AS customer_id, 100 AS amount UNION ALL
+ SELECT 2 AS id, 10 AS customer_id, 200 AS amount UNION ALL
+ SELECT 3 AS id, 20 AS customer_id, 50 AS amount UNION ALL
+ SELECT 4 AS id, 30 AS customer_id, 75 AS amount
+ \`,
+
+ joins: {
+ Customers: {
+ relationship: \`many_to_one\`,
+ sql: \`\${CUBE}.customer_id = \${Customers}.id\`,
+ },
+ },
+
+ measures: {
+ totalAmount: {
+ sql: \`amount\`,
+ type: \`sum\`,
+ },
+ },
+
+ dimensions: {
+ id: {
+ sql: \`id\`,
+ type: \`number\`,
+ primaryKey: true,
+ },
+ },
+});
+
+cube(\`Customers\`, {
+ sql: \`
+ SELECT 10 AS id UNION ALL
+ SELECT 20 AS id UNION ALL
+ SELECT 30 AS id
+ \`,
+
+ joins: {
+ CustomerOrders: {
+ relationship: \`one_to_many\`,
+ sql: \`\${CUBE}.id = \${CustomerOrders}.customer_id\`,
+ },
+ },
+
+ dimensions: {
+ id: {
+ sql: \`id\`,
+ type: \`number\`,
+ primaryKey: true,
+ },
+
+ totalSpend: {
+ sql: \`\${CustomerOrders.orderTotal}\`,
+ type: \`number\`,
+ subQuery: true,
+ },
+ },
+});
+
+cube(\`CustomerOrders\`, {
+ sql: \`
+ SELECT 1 AS id, 10 AS customer_id, 80 AS amount UNION ALL
+ SELECT 2 AS id, 10 AS customer_id, 70 AS amount UNION ALL
+ SELECT 3 AS id, 20 AS customer_id, 30 AS amount UNION ALL
+ SELECT 4 AS id, 30 AS customer_id, 200 AS amount
+ \`,
+
+ dimensions: {
+ id: {
+ sql: \`id\`,
+ type: \`number\`,
+ primaryKey: true,
+ },
+
+ customerId: {
+ sql: \`\${CUBE}.customer_id\`,
+ type: \`number\`,
+ },
+ },
+
+ measures: {
+ orderTotal: {
+ sql: \`amount\`,
+ type: \`sum\`,
+ },
+ },
+});
+ `);
+
+ async function runQueryTest(q, expectedResult) {
+ await compiler.compile();
+ const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, q);
+
+ console.log(query.buildSqlAndParams());
+
+ const res = await dbRunner.testQuery(query.buildSqlAndParams());
+ console.log(JSON.stringify(res));
+
+ expect(res).toEqual(
+ expectedResult
+ );
+ }
+
+ it('subquery dimension used in filter', async () => runQueryTest({
+ measures: ['Sales.totalAmount'],
+ filters: [
+ {
+ member: 'Customers.totalSpend',
+ operator: 'gt',
+ values: ['100'],
+ },
+ ],
+ }, [{
+ sales__total_amount: '375',
+ }]));
+});
diff --git a/rust/cubesqlplanner/cubesqlplanner/Cargo.toml b/rust/cubesqlplanner/cubesqlplanner/Cargo.toml
index 264a7159f7829..054d07612b296 100644
--- a/rust/cubesqlplanner/cubesqlplanner/Cargo.toml
+++ b/rust/cubesqlplanner/cubesqlplanner/Cargo.toml
@@ -23,6 +23,7 @@ regex = "1.3.9"
typed-builder = "0.21.2"
indoc = "2.0.7"
+
[dev-dependencies]
petgraph = "0.6"
serde_yaml = "0.9"
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs
index 3196071960449..1d58958f32e20 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs
@@ -11,6 +11,7 @@ use cubenativeutils::wrappers::{NativeArray, NativeContextHolder, NativeObjectHa
use cubenativeutils::CubeError;
use serde::{Deserialize, Serialize};
use std::any::Any;
+use std::collections::HashMap;
use std::rc::Rc;
#[derive(Serialize, Deserialize, Debug, Clone)]
@@ -75,6 +76,8 @@ pub struct BaseQueryOptionsStatic {
pub pre_aggregation_id: Option,
#[serde(rename = "maskedMembers")]
pub masked_members: Option>,
+ #[serde(rename = "memberToAlias", default)]
+ pub member_to_alias: Option>,
}
#[nativebridge::native_bridge(BaseQueryOptionsStatic)]
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs
index 3be34c87a8d70..63d2260c55b9e 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs
@@ -2,17 +2,16 @@ use crate::logical_plan::visitor::*;
use crate::logical_plan::*;
use cubenativeutils::CubeError;
use itertools::Itertools;
-use std::collections::HashSet;
use std::rc::Rc;
struct CubeNamesCollector {
- cube_names: HashSet,
+ cube_names: Vec,
}
impl LogicalNodeVisitor for CubeNamesCollector {
fn process_node(&mut self, node: &PlanNode) -> Result<(), CubeError> {
if let PlanNode::Cube(cube) = node {
- self.cube_names.insert(cube.name().clone());
+ self.cube_names.push(cube.name().clone());
}
Ok(())
}
@@ -22,9 +21,9 @@ pub fn collect_cube_names_from_node(
node: &Rc,
) -> Result, CubeError> {
let mut collector = CubeNamesCollector {
- cube_names: HashSet::new(),
+ cube_names: Vec::new(),
};
let visitor = LogicalPlanVisitor::new();
visitor.visit(&mut collector, node)?;
- Ok(collector.cube_names.into_iter().collect_vec())
+ Ok(collector.cube_names.into_iter().unique().collect_vec())
}
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs
index 8ee5d10c4e9df..dd8a0bffc962f 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs
@@ -1,8 +1,9 @@
+use crate::planner::multi_fact_join_groups::MultiFactJoinGroups;
use crate::planner::sql_evaluator::{MemberSymbol, SqlCall};
use std::fmt::Debug;
use std::rc::Rc;
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct PreAggregationJoinItem {
pub from: Rc,
pub to: Rc,
@@ -11,18 +12,18 @@ pub struct PreAggregationJoinItem {
pub on_sql: Rc,
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct PreAggregationJoin {
pub root: Rc,
pub items: Vec,
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct PreAggregationUnion {
pub items: Vec>,
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub struct PreAggregationTable {
pub cube_name: String,
pub cube_alias: String,
@@ -30,7 +31,7 @@ pub struct PreAggregationTable {
pub alias: Option,
}
-#[derive(Clone)]
+#[derive(Clone, Debug)]
pub enum PreAggregationSource {
Single(PreAggregationTable),
Join(PreAggregationJoin),
@@ -49,6 +50,7 @@ pub struct CompiledPreAggregation {
pub time_dimensions: Vec>,
pub segments: Vec>,
pub allow_non_strict_date_range_match: bool,
+ pub multi_fact_join_groups: MultiFactJoinGroups,
}
impl Debug for CompiledPreAggregation {
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs
index 6d0aa5a61480a..3d5803cff715f 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs
@@ -3,6 +3,7 @@ use crate::planner::sql_evaluator::MemberSymbol;
use cubenativeutils::CubeError;
use std::collections::HashSet;
use std::rc::Rc;
+
pub struct MeasureMatcher {
only_addictive: bool,
pre_aggregation_measures: HashSet,
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs
index 7fcb9477b654c..9b8548cb17456 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs
@@ -3,6 +3,8 @@ use super::*;
use crate::logical_plan::visitor::{LogicalPlanRewriter, NodeRewriteResult};
use crate::logical_plan::*;
use crate::plan::FilterItem;
+use crate::planner::join_hints::JoinHints;
+use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups};
use crate::planner::query_tools::QueryTools;
use crate::planner::sql_evaluator::MemberSymbol;
use cubenativeutils::CubeError;
@@ -462,11 +464,58 @@ impl PreAggregationOptimizer {
if match_state == MatchState::NotMatched {
return Ok(None);
}
- self.try_match_measures(
+ let matched = self.try_match_measures(
&all_measures,
pre_aggregation,
match_state == MatchState::Partial,
- )
+ )?;
+ if matched.is_none() {
+ return Ok(None);
+ }
+
+ if !self.are_join_paths_matching(schema, &all_measures, pre_aggregation)? {
+ return Ok(None);
+ }
+
+ Ok(matched)
+ }
+
+ fn are_join_paths_matching(
+ &self,
+ schema: &Rc,
+ measures: &[Rc],
+ pre_aggregation: &CompiledPreAggregation,
+ ) -> Result {
+ let query_hints = MeasuresJoinHints::builder(&JoinHints::new())
+ .add_dimensions(&schema.dimensions)
+ .add_dimensions(&schema.time_dimensions)
+ .build(measures)?;
+ let query_groups = MultiFactJoinGroups::try_new(self.query_tools.clone(), query_hints)?;
+ let pre_aggr_groups = &pre_aggregation.multi_fact_join_groups;
+
+ for dim in schema
+ .dimensions
+ .iter()
+ .chain(schema.time_dimensions.iter())
+ {
+ let query_path = query_groups.resolve_join_path_for_dimension(dim);
+ let pre_aggr_path = pre_aggr_groups.resolve_join_path_for_dimension(dim);
+ match (query_path, pre_aggr_path) {
+ (Some(qp), Some(pp)) if qp != pp => return Ok(false),
+ _ => {}
+ }
+ }
+
+ for measure in measures.iter() {
+ let query_path = query_groups.resolve_join_path_for_measure(measure);
+ let pre_aggr_path = pre_aggr_groups.resolve_join_path_for_measure(measure);
+ match (query_path, pre_aggr_path) {
+ (Some(qp), Some(pp)) if qp != pp => return Ok(false),
+ _ => {}
+ }
+ }
+
+ Ok(true)
}
fn try_match_measures(
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs
index 22bec416c1f99..cb667f3905014 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs
@@ -8,10 +8,10 @@ use crate::logical_plan::PreAggregationJoinItem;
use crate::logical_plan::PreAggregationTable;
use crate::logical_plan::PreAggregationUnion;
use crate::planner::join_hints::JoinHints;
+use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups};
use crate::planner::planners::JoinPlanner;
use crate::planner::planners::ResolvedJoinItem;
use crate::planner::query_tools::QueryTools;
-use crate::planner::sql_evaluator::collectors::collect_cube_names_from_symbols;
use crate::planner::sql_evaluator::MemberSymbol;
use crate::planner::sql_evaluator::TimeDimensionSymbol;
use crate::planner::GranularityHelper;
@@ -180,6 +180,14 @@ impl PreAggregationsCompiler {
.static_data()
.allow_non_strict_date_range_match
.unwrap_or(false);
+
+ let measures_join_hints = MeasuresJoinHints::builder(&JoinHints::new())
+ .add_dimensions(&dimensions)
+ .add_dimensions(&time_dimensions)
+ .build(&measures)?;
+ let multi_fact_join_groups =
+ MultiFactJoinGroups::try_new(self.query_tools.clone(), measures_join_hints)?;
+
let rollups = if let Some(refs) = description.rollup_references()? {
let r = self
.query_tools
@@ -191,7 +199,17 @@ impl PreAggregationsCompiler {
};
let source = if static_data.pre_aggregation_type == "rollupJoin" {
- PreAggregationSource::Join(self.build_join_source(&measures, &dimensions, &rollups)?)
+ let all_dimensions = dimensions
+ .iter()
+ .cloned()
+ .chain(time_dimensions.iter().cloned())
+ .chain(segments.iter().cloned())
+ .collect_vec();
+ PreAggregationSource::Join(self.build_join_source(
+ &measures,
+ &all_dimensions,
+ &rollups,
+ )?)
} else {
let cube = self
.query_tools
@@ -221,6 +239,7 @@ impl PreAggregationsCompiler {
time_dimensions,
segments,
allow_non_strict_date_range_match,
+ multi_fact_join_groups,
});
self.compiled_cache.insert(name.clone(), res.clone());
Ok(res)
@@ -283,6 +302,7 @@ impl PreAggregationsCompiler {
.allow_non_strict_date_range_match
.unwrap_or(false);
let granularity = pre_aggrs_for_lambda[0].granularity.clone();
+ let multi_fact_join_groups = pre_aggrs_for_lambda[0].multi_fact_join_groups.clone();
let source = PreAggregationSource::Union(PreAggregationUnion { items: sources });
let static_data = description.static_data();
@@ -297,6 +317,7 @@ impl PreAggregationsCompiler {
time_dimensions,
segments,
allow_non_strict_date_range_match,
+ multi_fact_join_groups,
});
self.compiled_cache.insert(name.clone(), res.clone());
Ok(res)
@@ -332,23 +353,30 @@ impl PreAggregationsCompiler {
Ok(())
}
+ fn join_hints_from_pre_aggregation(&self, symbols: &Vec>) -> JoinHints {
+ let mut result = JoinHints::new();
+ for symbol in symbols {
+ let path = symbol.path();
+ if path.len() == 1 {
+ result.push(JoinHintItem::Single(path[0].clone()));
+ } else {
+ result.push(JoinHintItem::Vector(path.clone()));
+ }
+ }
+ result
+ }
fn build_join_source(
&mut self,
measures: &Vec>,
- dimensions: &Vec>,
+ all_dimensions: &Vec>,
rollups: &Vec,
) -> Result {
let all_symbols = measures
.iter()
.cloned()
- .chain(dimensions.iter().cloned())
+ .chain(all_dimensions.iter().cloned())
.collect_vec();
- let pre_aggr_join_hints = JoinHints::from_items(
- collect_cube_names_from_symbols(&all_symbols)?
- .into_iter()
- .map(|v| JoinHintItem::Single(v))
- .collect_vec(),
- );
+ let pre_aggr_join_hints = self.join_hints_from_pre_aggregation(&all_symbols);
let join_planner = JoinPlanner::new(self.query_tools.clone());
let pre_aggrs_for_join = rollups
@@ -366,16 +394,13 @@ impl PreAggregationsCompiler {
.iter()
.cloned()
.chain(join_pre_aggr.dimensions.iter().cloned())
+ .chain(join_pre_aggr.time_dimensions.iter().cloned())
+ .chain(join_pre_aggr.segments.iter().cloned())
.collect_vec();
- let join_pre_aggr_join_hints = JoinHints::from_items(
- collect_cube_names_from_symbols(&all_symbols)?
- .into_iter()
- .map(|v| JoinHintItem::Single(v))
- .collect_vec(),
- );
- existing_joins.append(
- &mut join_planner.resolve_join_members_by_hints(&join_pre_aggr_join_hints)?,
- );
+ let join_pre_aggr_join_hints = self.join_hints_from_pre_aggregation(&all_symbols);
+ let mut existing =
+ join_planner.resolve_join_members_by_hints(&join_pre_aggr_join_hints)?;
+ existing_joins.append(&mut existing);
}
let not_existing_joins = target_joins
@@ -395,6 +420,7 @@ impl PreAggregationsCompiler {
.iter()
.map(|item| self.make_pre_aggregation_join_item(&pre_aggrs_for_join, item))
.collect::, _>>()?;
+
let res = PreAggregationJoin {
root: items[0].from.clone(),
items,
@@ -821,4 +847,63 @@ mod tests {
assert_eq!(compiled.dimensions.len(), 1);
assert_eq!(compiled.dimensions[0].full_name(), "orders.status");
}
+
+ #[test]
+ fn test_compile_rollup_join_calculated_measures() {
+ let schema = MockSchema::from_yaml_file("common/rollup_join_calculated_measures.yaml");
+ let test_context = TestContext::new(schema).unwrap();
+ let query_tools = test_context.query_tools().clone();
+
+ let cube_names = vec![
+ "line_items".to_string(),
+ "facts".to_string(),
+ "campaigns".to_string(),
+ ];
+ let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap();
+
+ let pre_agg_name = PreAggregationFullName::new(
+ "line_items".to_string(),
+ "combined_rollup_join".to_string(),
+ );
+ let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap();
+
+ // Check source is Join with correct structure
+ match compiled.source.as_ref() {
+ PreAggregationSource::Join(join) => {
+ // Root should be li_rollup
+ match join.root.as_ref() {
+ PreAggregationSource::Single(table) => {
+ assert_eq!(table.name, "li_rollup", "Root should be li_rollup");
+ }
+ _ => panic!("Expected Single source for root"),
+ }
+ // Should have 2 join items (line_items→facts, line_items→campaigns)
+ assert_eq!(
+ join.items.len(),
+ 2,
+ "Should have 2 join items, got {}",
+ join.items.len()
+ );
+ let to_names: Vec = join
+ .items
+ .iter()
+ .map(|item| match item.to.as_ref() {
+ PreAggregationSource::Single(table) => table.name.clone(),
+ _ => panic!("Expected Single source"),
+ })
+ .collect();
+ assert!(
+ to_names.contains(&"facts_rollup".to_string()),
+ "Should join to facts_rollup, got: {:?}",
+ to_names
+ );
+ assert!(
+ to_names.contains(&"campaigns_rollup".to_string()),
+ "Should join to campaigns_rollup, got: {:?}",
+ to_names
+ );
+ }
+ _ => panic!("Expected PreAggregationSource::Join"),
+ }
+ }
}
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
index 3caadc6dbe3af..27651cc87a0ef 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs
@@ -35,6 +35,7 @@ impl BaseQuery {
options.static_data().timezone.clone(),
options.static_data().export_annotated_sql,
options.static_data().masked_members.clone(),
+ options.static_data().member_to_alias.clone(),
)?;
let request = QueryProperties::try_new(query_tools.clone(), options)?;
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs
index 109b22a671c6a..f590b21e32274 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs
@@ -10,22 +10,28 @@ impl JoinHints {
Self { items: Vec::new() }
}
- pub fn from_items(mut items: Vec) -> Self {
- items.sort();
- items.dedup();
+ pub fn from_items(items: Vec) -> Self {
Self { items }
}
- pub fn insert(&mut self, item: JoinHintItem) {
- match self.items.binary_search(&item) {
- Ok(_) => {}
- Err(pos) => self.items.insert(pos, item),
+ pub fn push(&mut self, item: JoinHintItem) {
+ if let JoinHintItem::Single(ref name) = item {
+ if let Some(last) = self.items.last() {
+ let redundant = match last {
+ JoinHintItem::Single(s) => s == name,
+ JoinHintItem::Vector(v) => v.last() == Some(name),
+ };
+ if redundant {
+ return;
+ }
+ }
}
+ self.items.push(item);
}
pub fn extend(&mut self, other: &JoinHints) {
- for item in other.items.iter() {
- self.insert(item.clone());
+ for item in &other.items {
+ self.push(item.clone());
}
}
@@ -71,8 +77,6 @@ impl<'a> IntoIterator for &'a JoinHints {
#[cfg(test)]
mod tests {
use super::*;
- use std::collections::hash_map::DefaultHasher;
- use std::hash::{Hash, Hasher};
fn s(name: &str) -> JoinHintItem {
JoinHintItem::Single(name.to_string())
@@ -82,68 +86,88 @@ mod tests {
JoinHintItem::Vector(names.iter().map(|n| n.to_string()).collect())
}
- fn hash_of(h: &JoinHints) -> u64 {
- let mut hasher = DefaultHasher::new();
- h.hash(&mut hasher);
- hasher.finish()
- }
-
#[test]
- fn test_from_items_normalizes_and_deduplicates() {
- let hints = JoinHints::from_items(vec![
- s("orders"),
- v(&["users", "orders"]),
- s("orders"),
- s("abc"),
- ]);
+ fn test_from_items_preserves_order() {
+ let hints = JoinHints::from_items(vec![s("orders"), v(&["users", "orders"]), s("abc")]);
assert_eq!(hints.len(), 3);
- // sorted: Single comes before Vector for same content, and Singles are alphabetical
- assert_eq!(hints.items()[0], s("abc"));
- assert_eq!(hints.items()[1], s("orders"));
- assert_eq!(hints.items()[2], v(&["users", "orders"]));
-
- // Different insertion order → same result
- let hints2 = JoinHints::from_items(vec![s("abc"), v(&["users", "orders"]), s("orders")]);
- assert_eq!(hints, hints2);
- assert_eq!(hash_of(&hints), hash_of(&hints2));
+ assert_eq!(hints.items()[0], s("orders"));
+ assert_eq!(hints.items()[1], v(&["users", "orders"]));
+ assert_eq!(hints.items()[2], s("abc"));
}
#[test]
- fn test_insert_and_extend_preserve_invariant() {
+ fn test_push_and_extend() {
let mut a = JoinHints::new();
assert!(a.is_empty());
- a.insert(s("orders"));
- a.insert(s("abc"));
- a.insert(s("orders")); // duplicate
+ a.push(s("orders"));
+ a.push(s("abc"));
assert_eq!(a.len(), 2);
- assert_eq!(a.items()[0], s("abc"));
- assert_eq!(a.items()[1], s("orders"));
- let b = JoinHints::from_items(vec![s("orders"), v(&["a", "b"]), s("zzz")]);
+ let b = JoinHints::from_items(vec![s("zzz"), v(&["a", "b"])]);
a.extend(&b);
assert_eq!(a.len(), 4);
- // abc, orders, zzz (Singles sorted), then Vector
- assert_eq!(a.items()[0], s("abc"));
- assert_eq!(a.items()[1], s("orders"));
+ assert_eq!(a.items()[0], s("orders"));
+ assert_eq!(a.items()[1], s("abc"));
assert_eq!(a.items()[2], s("zzz"));
assert_eq!(a.items()[3], v(&["a", "b"]));
}
+ #[test]
+ fn test_extend_skips_redundant_at_boundary() {
+ let mut a = JoinHints::new();
+ a.push(s("orders"));
+
+ let b = JoinHints::from_items(vec![s("orders"), s("abc")]);
+ a.extend(&b);
+ assert_eq!(a.len(), 2);
+ assert_eq!(a.items()[0], s("orders"));
+ assert_eq!(a.items()[1], s("abc"));
+
+ let mut c = JoinHints::new();
+ c.push(v(&["x", "abc"]));
+ let d = JoinHints::from_items(vec![s("abc"), s("zzz")]);
+ c.extend(&d);
+ assert_eq!(
+ c.len(),
+ 2,
+ "Single after Vector ending with same name is skipped on extend"
+ );
+ assert_eq!(c.items()[0], v(&["x", "abc"]));
+ assert_eq!(c.items()[1], s("zzz"));
+ }
+
+ #[test]
+ fn test_push_skips_redundant_single() {
+ let mut hints = JoinHints::new();
+ hints.push(s("orders"));
+ hints.push(s("orders"));
+ assert_eq!(hints.len(), 1);
+
+ hints.push(v(&["users", "orders"]));
+ hints.push(s("orders"));
+ assert_eq!(
+ hints.len(),
+ 2,
+ "Single after Vector ending with same name is skipped"
+ );
+
+ hints.push(s("abc"));
+ assert_eq!(hints.len(), 3, "Different Single is added");
+ }
+
#[test]
fn test_into_items_and_into_iter() {
let hints = JoinHints::from_items(vec![s("b"), s("a"), v(&["x", "y"])]);
let cloned = hints.clone();
- // into_iter
let collected: Vec<_> = cloned.into_iter().collect();
assert_eq!(collected.len(), 3);
- // into_items
let items = hints.into_items();
assert_eq!(items.len(), 3);
- assert_eq!(items[0], s("a"));
- assert_eq!(items[1], s("b"));
+ assert_eq!(items[0], s("b"));
+ assert_eq!(items[1], s("a"));
}
}
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs
index 90cccfb6104ea..72747cbfe84c0 100644
--- a/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs
+++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs
@@ -3,6 +3,7 @@ pub mod base_join_condition;
pub mod base_query;
pub mod filter;
pub mod join_hints;
+pub mod multi_fact_join_groups;
pub mod time_dimension;
pub mod params_allocator;
diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs
new file mode 100644
index 0000000000000..3d6e3756b9296
--- /dev/null
+++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs
@@ -0,0 +1,433 @@
+use crate::cube_bridge::join_definition::JoinDefinition;
+use crate::plan::FilterItem;
+use crate::planner::join_hints::JoinHints;
+use crate::planner::query_tools::JoinKey;
+use crate::planner::query_tools::QueryTools;
+use crate::planner::sql_evaluator::collectors::{collect_join_hints, has_multi_stage_members};
+use crate::planner::sql_evaluator::MemberSymbol;
+use cubenativeutils::CubeError;
+use itertools::Itertools;
+use std::collections::HashMap;
+use std::rc::Rc;
+
+#[derive(Clone, Debug)]
+pub struct MeasureJoinHints {
+ pub measure: Rc,
+ pub hints: JoinHints,
+}
+
+pub struct MeasuresJoinHintsBuilder {
+ initial_hints: JoinHints,
+ dimensions: Vec>,
+ filters: Vec,
+}
+
+impl MeasuresJoinHintsBuilder {
+ pub fn add_dimensions(mut self, dims: &[Rc]) -> Self {
+ self.dimensions.extend(dims.iter().cloned());
+ self
+ }
+
+ pub fn add_filters(mut self, filters: &[FilterItem]) -> Self {
+ self.filters.extend(filters.iter().cloned());
+ self
+ }
+
+ pub fn build(self, measures: &[Rc]) -> Result {
+ let mut base_hints = self.initial_hints;
+
+ for sym in self.dimensions.iter() {
+ base_hints.extend(&collect_join_hints(sym)?);
+ }
+
+ let mut filter_symbols = Vec::new();
+ for item in self.filters.iter() {
+ item.find_all_member_evaluators(&mut filter_symbols);
+ }
+ for sym in filter_symbols.iter() {
+ base_hints.extend(&collect_join_hints(sym)?);
+ }
+
+ MeasuresJoinHints::from_base_hints(base_hints, measures)
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct MeasuresJoinHints {
+ base_hints: JoinHints,
+ measure_hints: Vec,
+}
+
+impl MeasuresJoinHints {
+ pub fn empty() -> Self {
+ Self {
+ base_hints: JoinHints::new(),
+ measure_hints: vec![],
+ }
+ }
+
+ pub fn builder(query_join_hints: &JoinHints) -> MeasuresJoinHintsBuilder {
+ MeasuresJoinHintsBuilder {
+ initial_hints: query_join_hints.clone(),
+ dimensions: Vec::new(),
+ filters: Vec::new(),
+ }
+ }
+
+ pub fn for_measures(&self, measures: &[Rc]) -> Result {
+ Self::from_base_hints(self.base_hints.clone(), measures)
+ }
+
+ fn from_base_hints(
+ base_hints: JoinHints,
+ measures: &[Rc],
+ ) -> Result {
+ let mut filtered_measures = Vec::new();
+ for m in measures {
+ if !has_multi_stage_members(m, true)? {
+ filtered_measures.push(m.clone());
+ }
+ }
+
+ let measure_hints: Vec = filtered_measures
+ .iter()
+ .map(|m| -> Result<_, CubeError> {
+ let mut hints = base_hints.clone();
+ hints.extend(&collect_join_hints(m)?);
+ Ok(MeasureJoinHints {
+ measure: m.clone(),
+ hints,
+ })
+ })
+ .collect::, _>>()?;
+
+ Ok(Self {
+ base_hints,
+ measure_hints,
+ })
+ }
+
+ pub fn base_hints(&self) -> &JoinHints {
+ &self.base_hints
+ }
+
+ pub fn measure_hints(&self) -> &[MeasureJoinHints] {
+ &self.measure_hints
+ }
+
+ pub fn hints_for_measure(&self, measure: &MemberSymbol) -> Option {
+ self.measure_hints
+ .iter()
+ .find(|mh| mh.measure.full_name() == measure.full_name())
+ .map(|mh| mh.hints.clone())
+ }
+}
+
+// --- MultiFactJoinGroups: builds actual join trees ---
+
+#[derive(Clone)]
+pub struct MultiFactJoinGroups {
+ query_tools: Rc,
+ measures_join_hints: MeasuresJoinHints,
+ groups: Vec<(Rc, Vec>)>,
+ /// cube_name → join path from root, computed from the first group (shared for dimensions).
+ dimension_paths: HashMap>,
+ /// measure full_name → join path from root, computed per group.
+ measure_paths: HashMap>,
+}
+
+impl MultiFactJoinGroups {
+ pub fn empty(query_tools: Rc) -> Self {
+ Self {
+ query_tools,
+ measures_join_hints: MeasuresJoinHints::empty(),
+ groups: vec![],
+ dimension_paths: HashMap::new(),
+ measure_paths: HashMap::new(),
+ }
+ }
+
+ pub fn try_new(
+ query_tools: Rc,
+ measures_join_hints: MeasuresJoinHints,
+ ) -> Result {
+ let groups = Self::build_groups(&query_tools, &measures_join_hints)?;
+ let (dimension_paths, measure_paths) = Self::precompute_paths(&groups)?;
+ Ok(Self {
+ query_tools,
+ measures_join_hints,
+ groups,
+ dimension_paths,
+ measure_paths,
+ })
+ }
+
+ pub fn for_measures(&self, measures: &[Rc]) -> Result {
+ let new_hints = self.measures_join_hints.for_measures(measures)?;
+ Self::try_new(self.query_tools.clone(), new_hints)
+ }
+
+ fn build_groups(
+ query_tools: &Rc,
+ hints: &MeasuresJoinHints,
+ ) -> Result, Vec>)>, CubeError> {
+ let measures_to_join = if hints.measure_hints.is_empty() {
+ if hints.base_hints.is_empty() {
+ vec![]
+ } else {
+ let (key, join) = query_tools.join_for_hints(&hints.base_hints)?;
+ vec![(Vec::new(), key, join)]
+ }
+ } else {
+ hints
+ .measure_hints
+ .iter()
+ .map(|mh| -> Result<_, CubeError> {
+ let (key, join) = query_tools.join_for_hints(&mh.hints)?;
+ Ok((vec![mh.measure.clone()], key, join))
+ })
+ .collect::, _>>()?
+ };
+
+ let mut key_order: Vec = Vec::new();
+ let mut grouped: HashMap, Vec>)> =
+ HashMap::new();
+ for (measures, key, join) in measures_to_join {
+ if let Some(entry) = grouped.get_mut(&key) {
+ entry.1.extend(measures);
+ } else {
+ key_order.push(key.clone());
+ grouped.insert(key, (join, measures));
+ }
+ }
+
+ Ok(key_order
+ .into_iter()
+ .map(|key| grouped.remove(&key).unwrap())
+ .collect())
+ }
+
+ pub fn measures_join_hints(&self) -> &MeasuresJoinHints {
+ &self.measures_join_hints
+ }
+
+ pub fn is_multi_fact(&self) -> bool {
+ self.groups.len() > 1
+ }
+
+ pub fn groups(&self) -> &[(Rc, Vec>)] {
+ &self.groups
+ }
+
+ pub fn num_groups(&self) -> usize {
+ self.groups.len()
+ }
+
+ /// Returns the join path from root to the dimension's cube.
+ /// Precomputed from the first group (dimension paths are identical across all groups).
+ pub fn resolve_join_path_for_dimension(
+ &self,
+ dimension: &Rc,
+ ) -> Option<&Vec> {
+ self.dimension_paths
+ .get(&dimension.clone().resolve_reference_chain().cube_name())
+ }
+
+ /// Returns the join path from root to the measure's cube.
+ /// Precomputed per measure from its group's JoinDefinition.
+ pub fn resolve_join_path_for_measure(
+ &self,
+ measure: &Rc,
+ ) -> Option<&Vec> {
+ self.measure_paths
+ .get(&measure.clone().resolve_reference_chain().full_name())
+ }
+
+ fn precompute_paths(
+ groups: &[(Rc, Vec>)],
+ ) -> Result<(HashMap>, HashMap>), CubeError> {
+ let dimension_paths = if groups.is_empty() {
+ HashMap::new()
+ } else {
+ Self::build_cube_paths(&*groups[0].0)?
+ };
+
+ let mut measure_paths = HashMap::new();
+ for (join, measures) in groups {
+ if measures.is_empty() {
+ continue;
+ }
+ let cube_paths = Self::build_cube_paths(&**join)?;
+ for m in measures {
+ if let Some(path) = cube_paths.get(&m.cube_name()) {
+ measure_paths.insert(m.full_name(), path.clone());
+ }
+ }
+ }
+
+ Ok((dimension_paths, measure_paths))
+ }
+
+ fn build_cube_paths(
+ join: &dyn JoinDefinition,
+ ) -> Result>, CubeError> {
+ let root = join.static_data().root.clone();
+ let mut paths: HashMap> = HashMap::new();
+ paths.insert(root.clone(), vec![root]);
+
+ for join_item in join.joins()? {
+ let sd = join_item.static_data();
+ let parent_path = paths
+ .get(&sd.original_from)
+ .cloned()
+ .unwrap_or_else(|| vec![sd.original_from.clone()]);
+ let mut path = parent_path;
+ path.push(sd.original_to.clone());
+ paths.insert(sd.original_to.clone(), path);
+ }
+
+ Ok(paths)
+ }
+
+ pub fn single_join(&self) -> Result