From e00fe125a546926e85b5c1c1be70a46fabcd53ff Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 17 Mar 2026 12:53:44 +0100 Subject: [PATCH 1/7] fix(query-orchestrator): Return cached result when refreshKey changes during must-revalidate (#10489) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When `refreshKey` changes faster than the query executes (e.g. `SELECT NOW()` with a slow query), `must-revalidate` gets stuck in an endless `Continue wait` loop. Each time the query finishes, the refreshKey has already changed — so it re-runs the query, which never catches up. Fix: Track which request produced the cached result. If the same request flow already has a result in cache but the refreshKey changed during execution — return the cached data immediately and refresh in the background, instead of blocking and re-executing. --- .../src/orchestrator/LocalCacheDriver.ts | 6 + .../src/orchestrator/QueryCache.ts | 38 ++- .../test/unit/PreAggregations.test.ts | 64 ++-- .../test/unit/QueryCache.abstract.ts | 275 +++++++++++++++++- 4 files changed, 335 insertions(+), 48 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts index 49877b9894807..d5147f27228d0 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalCacheDriver.ts @@ -47,6 +47,12 @@ export class LocalCacheDriver implements CacheDriverInterface { // Nothing to do } + public reset(): void { + for (const key of Object.keys(this.store)) { + delete this.store[key]; + } + } + public async testConnection(): Promise { // Nothing to do } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index a33eae2f27799..c4919daa1bb6e 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -128,6 +128,7 @@ type CacheEntry = { time: number; result: any; renewalKey: string; + requestId?: string; }; export interface QueryCacheOptions { @@ -301,6 +302,7 @@ export class QueryCache { requestId: queryBody.requestId, dataSource: queryBody.dataSource, persistent: queryBody.persistent, + skipRefreshKeyWaitForRenew: true, } ); } @@ -405,6 +407,12 @@ export class QueryCache { return key; } + public static extractRequestUUID(requestId: string): string { + const idx = requestId.lastIndexOf('-span-'); + + return idx !== -1 ? requestId.substring(0, idx) : requestId; + } + protected static replaceAll(replaceThis, withThis, inThis) { withThis = withThis.replace(/\$/g, '$$$$'); return inThis.replace( @@ -903,7 +911,8 @@ export class QueryCache { const result = { time: (new Date()).getTime(), result: res, - renewalKey + renewalKey, + requestId: options.requestId, }; return this .cacheDriver @@ -1003,14 +1012,24 @@ export class QueryCache { primaryQuery, renewCycle }); - if ( - renewalKey && ( - !renewalThreshold || - !parsedResult.time || - renewedAgo > renewalThreshold * 1000 || - parsedResult.renewalKey !== renewalKey - ) - ) { + + const isExpired = !renewalThreshold || !parsedResult.time || renewedAgo > renewalThreshold * 1000; + const isKeyMismatch = renewalKey && parsedResult.renewalKey !== renewalKey; + const isSameRequest = options.requestId && parsedResult.requestId && + QueryCache.extractRequestUUID(parsedResult.requestId) === QueryCache.extractRequestUUID(options.requestId); + + // Continue-wait cycle: result was produced by our request, + // refreshKey changed during execution — return cached, refresh in background. + // Skip for renewCycle — it must always fetch fresh data to keep cache up-to-date. + if (isSameRequest && !renewCycle && (isExpired || isKeyMismatch)) { + this.logger('Same request cache hit (background refresh)', { cacheKey, renewalThreshold, requestId: options.requestId, spanId, primaryQuery, renewCycle }); + fetchNew().catch(e => { + if (!(e instanceof ContinueWaitError)) { + this.logger('Error renewing', { cacheKey, error: e.stack || e, requestId: options.requestId, spanId, primaryQuery, renewCycle }); + } + }); + } else if (renewalKey && (isExpired || isKeyMismatch)) { + // Cache expired or refreshKey changed — need to refresh if (options.waitForRenew) { this.logger('Waiting for renew', { cacheKey, renewalThreshold, requestId: options.requestId, spanId, primaryQuery, renewCycle }); return fetchNew(); @@ -1023,6 +1042,7 @@ export class QueryCache { }); } } + this.logger('Using cache for', { cacheKey, requestId: options.requestId, spanId, primaryQuery, renewCycle }); if (options.useInMemory && renewedAgo + inMemoryCacheDisablePeriod <= renewalThreshold * 1000) { this.memoryCache.set(redisKey, parsedResult); diff --git a/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts b/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts index 3c021ee41cb2b..f8e31b4f4eb97 100644 --- a/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts +++ b/packages/cubejs-query-orchestrator/test/unit/PreAggregations.test.ts @@ -1,13 +1,13 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ -/* eslint-disable global-require */ -import R from 'ramda'; import { BUILD_RANGE_END_LOCAL, BUILD_RANGE_START_LOCAL, FROM_PARTITION_RANGE, TO_PARTITION_RANGE } from '@cubejs-backend/shared'; -import { PreAggregationPartitionRangeLoader, PreAggregations, version } from '../../src'; +import crypto from 'crypto'; + +import { PreAggregationPartitionRangeLoader, PreAggregations, QueryCache, LocalCacheDriver, version } from '../../src'; class MockDriver { public tables: string[] = []; @@ -123,34 +123,32 @@ describe('PreAggregations', () => { let mockExternalDriverFactory: (() => Promise) | null = null; let queryCache: any = null; - const basicQuery: any = { + const defaultCacheKeyQuery: [string, any[], Record] = ['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', [], { + renewalThreshold: 10, + external: false, + }]; + + const createBasicQuery = (overrides: Record = {}): any => ({ query: 'SELECT "orders__created_at_week" "orders__created_at_week", sum("orders__count") "orders__count" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20191101) as partition_union WHERE ("orders__created_at_week" >= ($1::timestamptz::timestamptz AT TIME ZONE \'UTC\') AND "orders__created_at_week" <= ($2::timestamptz::timestamptz AT TIME ZONE \'UTC\')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000', values: ['2019-11-01T00:00:00Z', '2019-11-30T23:59:59Z'], cacheKeyQueries: { renewalThreshold: 21600, - queries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', [], { - renewalThreshold: 10, - external: false, - }]] + queries: [defaultCacheKeyQuery] }, preAggregations: [{ preAggregationsSchema: 'stb_pre_aggregations', tableName: 'stb_pre_aggregations.orders_number_and_count20191101', loadSql: ['CREATE TABLE stb_pre_aggregations.orders_number_and_count20191101 AS SELECT\n date_trunc(\'week\', ("orders".created_at::timestamptz AT TIME ZONE \'UTC\')) "orders__created_at_week", count("orders".id) "orders__count", sum("orders".number) "orders__number"\n FROM\n public.orders AS "orders"\n WHERE ("orders".created_at >= $1::timestamptz AND "orders".created_at <= $2::timestamptz) GROUP BY 1', ['2019-11-01T00:00:00Z', '2019-11-30T23:59:59Z']], - invalidateKeyQueries: [['SELECT date_trunc(\'hour\', (NOW()::timestamptz AT TIME ZONE \'UTC\')) as current_hour', [], { - renewalThreshold: 10, - external: false, - }]] + invalidateKeyQueries: [defaultCacheKeyQuery], }], - requestId: 'basic' - }; + requestId: 'basic', + ...overrides, + }); - const basicQueryExternal = R.clone(basicQuery); - basicQueryExternal.preAggregations[0].external = true; - const basicQueryWithRenew = R.clone(basicQuery); - basicQueryWithRenew.renewQuery = true; - const basicQueryExternalWithRenew = R.clone(basicQueryExternal); - basicQueryExternalWithRenew.renewQuery = true; + const basicQuery: any = createBasicQuery(); + const basicQueryExternal = createBasicQuery({ preAggregations: [{ ...basicQuery.preAggregations[0], external: true }] }); + const basicQueryWithRenew = createBasicQuery({ renewQuery: true }); + const basicQueryExternalWithRenew = createBasicQuery({ preAggregations: [{ ...basicQuery.preAggregations[0], external: true }], renewQuery: true }); beforeEach(() => { mockDriver = new MockDriver(); @@ -167,11 +165,6 @@ describe('PreAggregations', () => { return driver; }; - jest.resetModules(); - - // Dynamic require after resetModules to ensure fresh module state - // eslint-disable-next-line @typescript-eslint/no-var-requires - const { QueryCache } = require('../../src/orchestrator/QueryCache'); queryCache = new QueryCache( 'TEST', mockDriverFactory as any, @@ -179,12 +172,15 @@ describe('PreAggregations', () => { () => {}, { cacheAndQueueDriver: 'memory', - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), }, ); + + // Reset the shared in-memory cache store between tests + (queryCache.getCacheDriver() as LocalCacheDriver).reset(); }); describe('loadAllPreAggregationsIfNeeded', () => { @@ -198,7 +194,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -225,7 +221,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -252,7 +248,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -279,7 +275,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -310,7 +306,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -344,7 +340,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -395,7 +391,7 @@ describe('PreAggregations', () => { () => {}, queryCache!, { - queueOptions: () => ({ + queueOptions: async () => ({ executionTimeout: 1, concurrency: 2, }), @@ -932,8 +928,6 @@ describe('PreAggregations', () => { // Old implementation (before the unsigned shift fix) // This would hang on certain inputs, but for inputs that don't trigger the bug, // it should produce the same results as the new implementation - const crypto = require('crypto'); - function oldVersion(cacheKey: any): string | null { let result = ''; diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts index 58403e93f6504..5c77fd3333e8c 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts @@ -1,24 +1,34 @@ import crypto from 'crypto'; import { createCancelablePromise, pausePromise } from '@cubejs-backend/shared'; -import { QueryCache, QueryCacheOptions } from '../../src'; +import { CacheKey, CacheKeyItem, QueryCache, QueryCacheOptions } from '../../src'; export type QueryCacheTestOptions = QueryCacheOptions & { beforeAll?: () => Promise, afterAll?: () => Promise, }; +class QueryCacheOpened extends QueryCache { + public readonly logger = jest.fn(super.logger); +} + export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) => { describe(`QueryQueue${name}`, () => { - const cache = new QueryCache( + const cache = new QueryCacheOpened( crypto.randomBytes(16).toString('hex'), + () => { + throw new Error('driverFactory is not implemented, mock should be used...'); + }, jest.fn(() => { - throw new Error('It`s not implemented mock...'); + throw new Error('logger is not implemented, mock should be used...'); }), - jest.fn(), options, ); + beforeEach(() => { + cache.logger.mockClear(); + }); + beforeAll(async () => { if (options?.beforeAll) { await options?.beforeAll(); @@ -110,6 +120,263 @@ export const QueryCacheTest = (name: string, options: QueryCacheTestOptions) => expect(callbackWasExecuted).toEqual(true); }); + describe('cacheQueryResult renewal logic', () => { + const renewalKeyA = QueryCache.queryCacheKey({ query: 'key-a', values: [] }); + const renewalKeyOld = QueryCache.queryCacheKey({ query: 'key-old', values: [] }); + const renewalKeyNew = QueryCache.queryCacheKey({ query: 'key-new', values: [] }); + + const seedCache = async (cacheKey: CacheKey, entry: CacheKeyItem) => { + const redisKey = cache.queryRedisKey(cacheKey); + await cache.getCacheDriver().set(redisKey, entry, 3600); + }; + + const callCacheQueryResult = async ( + cacheKey, + cacheEntry, + opts: { + renewalThreshold?: number; + renewalKey?; + waitForRenew?: boolean; + requestId?: string; + renewCycle?: boolean; + } + ) => { + // cacheQueryResult hashes options.renewalKey via queryRedisKey(), + // and fetchNew() stores that hash in the entry. Replicate that for seeding. + const seededEntry = { + ...cacheEntry, + renewalKey: cacheEntry.renewalKey + ? cache.queryRedisKey(cacheEntry.renewalKey) + : cacheEntry.renewalKey, + }; + await seedCache(cacheKey, seededEntry); + + const fetchNewCalled = { value: false, blocked: false }; + + const spy = jest.spyOn(cache, 'queryWithRetryAndRelease').mockImplementation(async () => { + fetchNewCalled.value = true; + return 'new-result'; + }); + + try { + const result = await cache.cacheQueryResult( + 'SELECT 1', + [], + cacheKey, + 3600, + { + renewalThreshold: opts.renewalThreshold ?? 600, + renewalKey: opts.renewalKey, + waitForRenew: opts.waitForRenew ?? false, + requestId: opts.requestId, + dataSource: 'default', + renewCycle: opts.renewCycle, + } + ); + + fetchNewCalled.blocked = result === 'new-result'; + + return { result, fetchNewCalled: fetchNewCalled.value, blocked: fetchNewCalled.blocked }; + } finally { + spy.mockRestore(); + } + }; + + it('expired + waitForRenew: blocks on fetchNew', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'expired-wait', values: [] }); + const entry = { + time: Date.now() - 700 * 1000, + result: 'cached-data', + renewalKey: renewalKeyA, + }; + + const { result, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyA, + waitForRenew: true, + requestId: 'req-1', + }); + + expect(blocked).toBe(true); + expect(result).toBe('new-result'); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew'); + }); + + it('expired + no waitForRenew: returns cached, background refresh', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'expired-no-wait', values: [] }); + const entry = { + time: Date.now() - 700 * 1000, + result: 'cached-data', + renewalKey: renewalKeyA, + }; + + const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyA, + waitForRenew: false, + requestId: 'req-2', + }); + + expect(result).toBe('cached-data'); + expect(fetchNewCalled).toBe(true); + expect(blocked).toBe(false); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Renewing existing key'); + }); + + it('key mismatch + not expired + waitForRenew: blocks on fetchNew', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'key-mismatch-user', values: [] }); + const entry = { + time: Date.now() - 100 * 1000, + result: 'cached-data', + renewalKey: renewalKeyOld, + }; + + const { result, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyNew, + waitForRenew: true, + renewCycle: false, + requestId: 'req-3', + }); + + expect(blocked).toBe(true); + expect(result).toBe('new-result'); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew'); + }); + + it('key mismatch + not expired + renew cycle: blocks on fetchNew', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'key-mismatch-renew', values: [] }); + const entry = { + time: Date.now() - 100 * 1000, + result: 'cached-data', + renewalKey: renewalKeyOld, + }; + + const { result, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyNew, + waitForRenew: true, + renewCycle: true, + requestId: 'req-4', + }); + + expect(blocked).toBe(true); + expect(result).toBe('new-result'); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew'); + }); + + it('same request + expired: returns cached, background refresh', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-expired', values: [] }); + const entry = { + time: Date.now() - 700 * 1000, + result: 'cached-data', + renewalKey: renewalKeyOld, + requestId: 'abc-123-span-1', + }; + + const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyNew, + waitForRenew: true, + requestId: 'abc-123-span-2', + }); + + expect(result).toBe('cached-data'); + expect(fetchNewCalled).toBe(true); + expect(blocked).toBe(false); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Same request cache hit (background refresh)'); + }); + + it('same request + key mismatch only: returns cached, background refresh', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-key-mismatch', values: [] }); + const entry = { + time: Date.now() - 100 * 1000, + result: 'cached-data', + renewalKey: renewalKeyOld, + requestId: 'conn-456-sub-789-span-aaa', + }; + + const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyNew, + waitForRenew: true, + requestId: 'conn-456-sub-789-span-bbb', + }); + + expect(result).toBe('cached-data'); + expect(fetchNewCalled).toBe(true); + expect(blocked).toBe(false); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Same request cache hit (background refresh)'); + }); + + it('same request + renewCycle + key mismatch: must block on fetchNew (not return stale cache)', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-renew-cycle', values: [] }); + const entry = { + time: Date.now() - 100 * 1000, + result: 'stale-data', + renewalKey: renewalKeyOld, + requestId: 'req-cycle-span-1', + }; + + const { result, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyNew, + waitForRenew: true, + renewCycle: true, + requestId: 'req-cycle-span-2', + }); + + // renewCycle must always fetch fresh data even when requestId matches + expect(blocked).toBe(true); + expect(result).toBe('new-result'); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew'); + }); + + it('same request + renewCycle + expired: must block on fetchNew', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'same-req-renew-cycle-expired', values: [] }); + const entry = { + time: Date.now() - 700 * 1000, + result: 'stale-data', + renewalKey: renewalKeyOld, + requestId: 'req-exp-cycle-span-1', + }; + + const { result, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyNew, + waitForRenew: true, + renewCycle: true, + requestId: 'req-exp-cycle-span-2', + }); + + expect(blocked).toBe(true); + expect(result).toBe('new-result'); + expect(cache.logger.mock.calls.map(c => c[0])).toContain('Waiting for renew'); + }); + + it('key matches + not expired: returns cached, no fetchNew', async () => { + const cacheKey = QueryCache.queryCacheKey({ query: 'key-match-fresh', values: [] }); + const entry = { + time: Date.now() - 100 * 1000, + result: 'cached-data', + renewalKey: renewalKeyA, + }; + + const { result, fetchNewCalled, blocked } = await callCacheQueryResult(cacheKey, entry, { + renewalThreshold: 600, + renewalKey: renewalKeyA, + waitForRenew: true, + requestId: 'req-7', + }); + + expect(result).toBe('cached-data'); + expect(fetchNewCalled).toBe(false); + expect(blocked).toBe(false); + expect(cache.logger.mock.calls.map(c => c[0])).not.toContain('Waiting for renew'); + expect(cache.logger.mock.calls.map(c => c[0])).not.toContain('Renewing existing key'); + }); + }); + it('queryCacheKey format', () => { const key1 = QueryCache.queryCacheKey({ query: 'select data', From d2bbc16af3118b5923a7ea4a32238dc2a9887c89 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 12:54:12 +0100 Subject: [PATCH 2/7] chore: Bump flatted from 3.2.4 to 3.4.1 (#10497) Bumps [flatted](https://github.com/WebReflection/flatted) from 3.2.4 to 3.4.1. - [Commits](https://github.com/WebReflection/flatted/compare/v3.2.4...v3.4.1) --- updated-dependencies: - dependency-name: flatted dependency-version: 3.4.1 dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- yarn.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn.lock b/yarn.lock index e0dac048fd449..0a2bdab390b3a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -14786,9 +14786,9 @@ flatbuffers@23.3.3: integrity sha512-jmreOaAT1t55keaf+Z259Tvh8tR/Srry9K8dgCgvizhKSEr6gLGgaOJI2WFL5fkOpGOGRZwxUrlFn0GCmXUy6g== flatted@^3.1.0: - version "3.2.4" - resolved "https://registry.yarnpkg.com/flatted/-/flatted-3.2.4.tgz#28d9969ea90661b5134259f312ab6aa7929ac5e2" - integrity sha512-8/sOawo8tJ4QOBX8YlQBMxL8+RLZfxMQOif9o0KUKTNTjMYElWPE0r/m5VNFxTRd0NSw8qSy8dajrwX4RYI1Hw== + version "3.4.1" + resolved "https://registry.yarnpkg.com/flatted/-/flatted-3.4.1.tgz#84ccd9579e76e9cc0d246c11d8be0beb019143e6" + integrity sha512-IxfVbRFVlV8V/yRaGzk0UVIcsKKHMSfYw66T/u4nTwlWteQePsxe//LjudR1AMX4tZW3WFCh3Zqa/sjlqpbURQ== flexsearch@^0.7.21: version "0.7.21" From eec59a2e4736d4b3a973abe98028fbfd3af54262 Mon Sep 17 00:00:00 2001 From: waralexrom <108349432+waralexrom@users.noreply.github.com> Date: Tue, 17 Mar 2026 13:35:13 +0100 Subject: [PATCH 3/7] fix(tesseract): Pre-aggregations matching should respect join path (#10500) --- rust/cubesqlplanner/cubesqlplanner/Cargo.toml | 1 + .../optimizers/common/cube_names_collector.rs | 9 +- .../compiled_pre_aggregation.rs | 12 +- .../pre_aggregation/measure_matcher.rs | 1 + .../optimizers/pre_aggregation/optimizer.rs | 53 ++- .../pre_aggregations_compiler.rs | 123 ++++- .../cubesqlplanner/src/planner/join_hints.rs | 118 +++-- .../cubesqlplanner/src/planner/mod.rs | 1 + .../src/planner/multi_fact_join_groups.rs | 433 ++++++++++++++++++ .../multiplied_measures_query_planner.rs | 20 +- .../src/planner/query_properties.rs | 177 +------ .../cubesqlplanner/src/planner/query_tools.rs | 111 +---- .../src/planner/sql_evaluator/compiler.rs | 47 +- .../symbols/common/symbol_path.rs | 23 +- .../test_fixtures/cube_bridge/mock_schema.rs | 45 +- .../cube_bridge/yaml/base_query_options.rs | 2 + .../test_fixtures/cube_bridge/yaml/schema.rs | 6 +- .../schemas/yaml_files/common/multi_fact.yaml | 73 +++ .../common/multiple_join_paths.yaml | 93 ++++ .../rollup_join_calculated_measures.yaml | 115 +++++ .../test_fixtures/test_utils/test_context.rs | 15 + .../src/tests/common_sql_generation.rs | 65 +++ .../tests/pre_aggregation_sql_generation.rs | 46 ++ ...act_two_measures_from_different_cubes.snap | 24 + 24 files changed, 1233 insertions(+), 380 deletions(-) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_fact.yaml create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multiple_join_paths.yaml create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/rollup_join_calculated_measures.yaml create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__common_sql_generation__multi_fact_two_measures_from_different_cubes.snap diff --git a/rust/cubesqlplanner/cubesqlplanner/Cargo.toml b/rust/cubesqlplanner/cubesqlplanner/Cargo.toml index 264a7159f7829..054d07612b296 100644 --- a/rust/cubesqlplanner/cubesqlplanner/Cargo.toml +++ b/rust/cubesqlplanner/cubesqlplanner/Cargo.toml @@ -23,6 +23,7 @@ regex = "1.3.9" typed-builder = "0.21.2" indoc = "2.0.7" + [dev-dependencies] petgraph = "0.6" serde_yaml = "0.9" diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs index 3be34c87a8d70..63d2260c55b9e 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/common/cube_names_collector.rs @@ -2,17 +2,16 @@ use crate::logical_plan::visitor::*; use crate::logical_plan::*; use cubenativeutils::CubeError; use itertools::Itertools; -use std::collections::HashSet; use std::rc::Rc; struct CubeNamesCollector { - cube_names: HashSet, + cube_names: Vec, } impl LogicalNodeVisitor for CubeNamesCollector { fn process_node(&mut self, node: &PlanNode) -> Result<(), CubeError> { if let PlanNode::Cube(cube) = node { - self.cube_names.insert(cube.name().clone()); + self.cube_names.push(cube.name().clone()); } Ok(()) } @@ -22,9 +21,9 @@ pub fn collect_cube_names_from_node( node: &Rc, ) -> Result, CubeError> { let mut collector = CubeNamesCollector { - cube_names: HashSet::new(), + cube_names: Vec::new(), }; let visitor = LogicalPlanVisitor::new(); visitor.visit(&mut collector, node)?; - Ok(collector.cube_names.into_iter().collect_vec()) + Ok(collector.cube_names.into_iter().unique().collect_vec()) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs index 8ee5d10c4e9df..dd8a0bffc962f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs @@ -1,8 +1,9 @@ +use crate::planner::multi_fact_join_groups::MultiFactJoinGroups; use crate::planner::sql_evaluator::{MemberSymbol, SqlCall}; use std::fmt::Debug; use std::rc::Rc; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PreAggregationJoinItem { pub from: Rc, pub to: Rc, @@ -11,18 +12,18 @@ pub struct PreAggregationJoinItem { pub on_sql: Rc, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PreAggregationJoin { pub root: Rc, pub items: Vec, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PreAggregationUnion { pub items: Vec>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct PreAggregationTable { pub cube_name: String, pub cube_alias: String, @@ -30,7 +31,7 @@ pub struct PreAggregationTable { pub alias: Option, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum PreAggregationSource { Single(PreAggregationTable), Join(PreAggregationJoin), @@ -49,6 +50,7 @@ pub struct CompiledPreAggregation { pub time_dimensions: Vec>, pub segments: Vec>, pub allow_non_strict_date_range_match: bool, + pub multi_fact_join_groups: MultiFactJoinGroups, } impl Debug for CompiledPreAggregation { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs index 6d0aa5a61480a..3d5803cff715f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/measure_matcher.rs @@ -3,6 +3,7 @@ use crate::planner::sql_evaluator::MemberSymbol; use cubenativeutils::CubeError; use std::collections::HashSet; use std::rc::Rc; + pub struct MeasureMatcher { only_addictive: bool, pre_aggregation_measures: HashSet, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 7fcb9477b654c..9b8548cb17456 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -3,6 +3,8 @@ use super::*; use crate::logical_plan::visitor::{LogicalPlanRewriter, NodeRewriteResult}; use crate::logical_plan::*; use crate::plan::FilterItem; +use crate::planner::join_hints::JoinHints; +use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::MemberSymbol; use cubenativeutils::CubeError; @@ -462,11 +464,58 @@ impl PreAggregationOptimizer { if match_state == MatchState::NotMatched { return Ok(None); } - self.try_match_measures( + let matched = self.try_match_measures( &all_measures, pre_aggregation, match_state == MatchState::Partial, - ) + )?; + if matched.is_none() { + return Ok(None); + } + + if !self.are_join_paths_matching(schema, &all_measures, pre_aggregation)? { + return Ok(None); + } + + Ok(matched) + } + + fn are_join_paths_matching( + &self, + schema: &Rc, + measures: &[Rc], + pre_aggregation: &CompiledPreAggregation, + ) -> Result { + let query_hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(&schema.dimensions) + .add_dimensions(&schema.time_dimensions) + .build(measures)?; + let query_groups = MultiFactJoinGroups::try_new(self.query_tools.clone(), query_hints)?; + let pre_aggr_groups = &pre_aggregation.multi_fact_join_groups; + + for dim in schema + .dimensions + .iter() + .chain(schema.time_dimensions.iter()) + { + let query_path = query_groups.resolve_join_path_for_dimension(dim); + let pre_aggr_path = pre_aggr_groups.resolve_join_path_for_dimension(dim); + match (query_path, pre_aggr_path) { + (Some(qp), Some(pp)) if qp != pp => return Ok(false), + _ => {} + } + } + + for measure in measures.iter() { + let query_path = query_groups.resolve_join_path_for_measure(measure); + let pre_aggr_path = pre_aggr_groups.resolve_join_path_for_measure(measure); + match (query_path, pre_aggr_path) { + (Some(qp), Some(pp)) if qp != pp => return Ok(false), + _ => {} + } + } + + Ok(true) } fn try_match_measures( diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs index 22bec416c1f99..cb667f3905014 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs @@ -8,10 +8,10 @@ use crate::logical_plan::PreAggregationJoinItem; use crate::logical_plan::PreAggregationTable; use crate::logical_plan::PreAggregationUnion; use crate::planner::join_hints::JoinHints; +use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; use crate::planner::planners::JoinPlanner; use crate::planner::planners::ResolvedJoinItem; use crate::planner::query_tools::QueryTools; -use crate::planner::sql_evaluator::collectors::collect_cube_names_from_symbols; use crate::planner::sql_evaluator::MemberSymbol; use crate::planner::sql_evaluator::TimeDimensionSymbol; use crate::planner::GranularityHelper; @@ -180,6 +180,14 @@ impl PreAggregationsCompiler { .static_data() .allow_non_strict_date_range_match .unwrap_or(false); + + let measures_join_hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(&dimensions) + .add_dimensions(&time_dimensions) + .build(&measures)?; + let multi_fact_join_groups = + MultiFactJoinGroups::try_new(self.query_tools.clone(), measures_join_hints)?; + let rollups = if let Some(refs) = description.rollup_references()? { let r = self .query_tools @@ -191,7 +199,17 @@ impl PreAggregationsCompiler { }; let source = if static_data.pre_aggregation_type == "rollupJoin" { - PreAggregationSource::Join(self.build_join_source(&measures, &dimensions, &rollups)?) + let all_dimensions = dimensions + .iter() + .cloned() + .chain(time_dimensions.iter().cloned()) + .chain(segments.iter().cloned()) + .collect_vec(); + PreAggregationSource::Join(self.build_join_source( + &measures, + &all_dimensions, + &rollups, + )?) } else { let cube = self .query_tools @@ -221,6 +239,7 @@ impl PreAggregationsCompiler { time_dimensions, segments, allow_non_strict_date_range_match, + multi_fact_join_groups, }); self.compiled_cache.insert(name.clone(), res.clone()); Ok(res) @@ -283,6 +302,7 @@ impl PreAggregationsCompiler { .allow_non_strict_date_range_match .unwrap_or(false); let granularity = pre_aggrs_for_lambda[0].granularity.clone(); + let multi_fact_join_groups = pre_aggrs_for_lambda[0].multi_fact_join_groups.clone(); let source = PreAggregationSource::Union(PreAggregationUnion { items: sources }); let static_data = description.static_data(); @@ -297,6 +317,7 @@ impl PreAggregationsCompiler { time_dimensions, segments, allow_non_strict_date_range_match, + multi_fact_join_groups, }); self.compiled_cache.insert(name.clone(), res.clone()); Ok(res) @@ -332,23 +353,30 @@ impl PreAggregationsCompiler { Ok(()) } + fn join_hints_from_pre_aggregation(&self, symbols: &Vec>) -> JoinHints { + let mut result = JoinHints::new(); + for symbol in symbols { + let path = symbol.path(); + if path.len() == 1 { + result.push(JoinHintItem::Single(path[0].clone())); + } else { + result.push(JoinHintItem::Vector(path.clone())); + } + } + result + } fn build_join_source( &mut self, measures: &Vec>, - dimensions: &Vec>, + all_dimensions: &Vec>, rollups: &Vec, ) -> Result { let all_symbols = measures .iter() .cloned() - .chain(dimensions.iter().cloned()) + .chain(all_dimensions.iter().cloned()) .collect_vec(); - let pre_aggr_join_hints = JoinHints::from_items( - collect_cube_names_from_symbols(&all_symbols)? - .into_iter() - .map(|v| JoinHintItem::Single(v)) - .collect_vec(), - ); + let pre_aggr_join_hints = self.join_hints_from_pre_aggregation(&all_symbols); let join_planner = JoinPlanner::new(self.query_tools.clone()); let pre_aggrs_for_join = rollups @@ -366,16 +394,13 @@ impl PreAggregationsCompiler { .iter() .cloned() .chain(join_pre_aggr.dimensions.iter().cloned()) + .chain(join_pre_aggr.time_dimensions.iter().cloned()) + .chain(join_pre_aggr.segments.iter().cloned()) .collect_vec(); - let join_pre_aggr_join_hints = JoinHints::from_items( - collect_cube_names_from_symbols(&all_symbols)? - .into_iter() - .map(|v| JoinHintItem::Single(v)) - .collect_vec(), - ); - existing_joins.append( - &mut join_planner.resolve_join_members_by_hints(&join_pre_aggr_join_hints)?, - ); + let join_pre_aggr_join_hints = self.join_hints_from_pre_aggregation(&all_symbols); + let mut existing = + join_planner.resolve_join_members_by_hints(&join_pre_aggr_join_hints)?; + existing_joins.append(&mut existing); } let not_existing_joins = target_joins @@ -395,6 +420,7 @@ impl PreAggregationsCompiler { .iter() .map(|item| self.make_pre_aggregation_join_item(&pre_aggrs_for_join, item)) .collect::, _>>()?; + let res = PreAggregationJoin { root: items[0].from.clone(), items, @@ -821,4 +847,63 @@ mod tests { assert_eq!(compiled.dimensions.len(), 1); assert_eq!(compiled.dimensions[0].full_name(), "orders.status"); } + + #[test] + fn test_compile_rollup_join_calculated_measures() { + let schema = MockSchema::from_yaml_file("common/rollup_join_calculated_measures.yaml"); + let test_context = TestContext::new(schema).unwrap(); + let query_tools = test_context.query_tools().clone(); + + let cube_names = vec![ + "line_items".to_string(), + "facts".to_string(), + "campaigns".to_string(), + ]; + let mut compiler = PreAggregationsCompiler::try_new(query_tools, &cube_names).unwrap(); + + let pre_agg_name = PreAggregationFullName::new( + "line_items".to_string(), + "combined_rollup_join".to_string(), + ); + let compiled = compiler.compile_pre_aggregation(&pre_agg_name).unwrap(); + + // Check source is Join with correct structure + match compiled.source.as_ref() { + PreAggregationSource::Join(join) => { + // Root should be li_rollup + match join.root.as_ref() { + PreAggregationSource::Single(table) => { + assert_eq!(table.name, "li_rollup", "Root should be li_rollup"); + } + _ => panic!("Expected Single source for root"), + } + // Should have 2 join items (line_items→facts, line_items→campaigns) + assert_eq!( + join.items.len(), + 2, + "Should have 2 join items, got {}", + join.items.len() + ); + let to_names: Vec = join + .items + .iter() + .map(|item| match item.to.as_ref() { + PreAggregationSource::Single(table) => table.name.clone(), + _ => panic!("Expected Single source"), + }) + .collect(); + assert!( + to_names.contains(&"facts_rollup".to_string()), + "Should join to facts_rollup, got: {:?}", + to_names + ); + assert!( + to_names.contains(&"campaigns_rollup".to_string()), + "Should join to campaigns_rollup, got: {:?}", + to_names + ); + } + _ => panic!("Expected PreAggregationSource::Join"), + } + } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs index 109b22a671c6a..f590b21e32274 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/join_hints.rs @@ -10,22 +10,28 @@ impl JoinHints { Self { items: Vec::new() } } - pub fn from_items(mut items: Vec) -> Self { - items.sort(); - items.dedup(); + pub fn from_items(items: Vec) -> Self { Self { items } } - pub fn insert(&mut self, item: JoinHintItem) { - match self.items.binary_search(&item) { - Ok(_) => {} - Err(pos) => self.items.insert(pos, item), + pub fn push(&mut self, item: JoinHintItem) { + if let JoinHintItem::Single(ref name) = item { + if let Some(last) = self.items.last() { + let redundant = match last { + JoinHintItem::Single(s) => s == name, + JoinHintItem::Vector(v) => v.last() == Some(name), + }; + if redundant { + return; + } + } } + self.items.push(item); } pub fn extend(&mut self, other: &JoinHints) { - for item in other.items.iter() { - self.insert(item.clone()); + for item in &other.items { + self.push(item.clone()); } } @@ -71,8 +77,6 @@ impl<'a> IntoIterator for &'a JoinHints { #[cfg(test)] mod tests { use super::*; - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; fn s(name: &str) -> JoinHintItem { JoinHintItem::Single(name.to_string()) @@ -82,68 +86,88 @@ mod tests { JoinHintItem::Vector(names.iter().map(|n| n.to_string()).collect()) } - fn hash_of(h: &JoinHints) -> u64 { - let mut hasher = DefaultHasher::new(); - h.hash(&mut hasher); - hasher.finish() - } - #[test] - fn test_from_items_normalizes_and_deduplicates() { - let hints = JoinHints::from_items(vec![ - s("orders"), - v(&["users", "orders"]), - s("orders"), - s("abc"), - ]); + fn test_from_items_preserves_order() { + let hints = JoinHints::from_items(vec![s("orders"), v(&["users", "orders"]), s("abc")]); assert_eq!(hints.len(), 3); - // sorted: Single comes before Vector for same content, and Singles are alphabetical - assert_eq!(hints.items()[0], s("abc")); - assert_eq!(hints.items()[1], s("orders")); - assert_eq!(hints.items()[2], v(&["users", "orders"])); - - // Different insertion order → same result - let hints2 = JoinHints::from_items(vec![s("abc"), v(&["users", "orders"]), s("orders")]); - assert_eq!(hints, hints2); - assert_eq!(hash_of(&hints), hash_of(&hints2)); + assert_eq!(hints.items()[0], s("orders")); + assert_eq!(hints.items()[1], v(&["users", "orders"])); + assert_eq!(hints.items()[2], s("abc")); } #[test] - fn test_insert_and_extend_preserve_invariant() { + fn test_push_and_extend() { let mut a = JoinHints::new(); assert!(a.is_empty()); - a.insert(s("orders")); - a.insert(s("abc")); - a.insert(s("orders")); // duplicate + a.push(s("orders")); + a.push(s("abc")); assert_eq!(a.len(), 2); - assert_eq!(a.items()[0], s("abc")); - assert_eq!(a.items()[1], s("orders")); - let b = JoinHints::from_items(vec![s("orders"), v(&["a", "b"]), s("zzz")]); + let b = JoinHints::from_items(vec![s("zzz"), v(&["a", "b"])]); a.extend(&b); assert_eq!(a.len(), 4); - // abc, orders, zzz (Singles sorted), then Vector - assert_eq!(a.items()[0], s("abc")); - assert_eq!(a.items()[1], s("orders")); + assert_eq!(a.items()[0], s("orders")); + assert_eq!(a.items()[1], s("abc")); assert_eq!(a.items()[2], s("zzz")); assert_eq!(a.items()[3], v(&["a", "b"])); } + #[test] + fn test_extend_skips_redundant_at_boundary() { + let mut a = JoinHints::new(); + a.push(s("orders")); + + let b = JoinHints::from_items(vec![s("orders"), s("abc")]); + a.extend(&b); + assert_eq!(a.len(), 2); + assert_eq!(a.items()[0], s("orders")); + assert_eq!(a.items()[1], s("abc")); + + let mut c = JoinHints::new(); + c.push(v(&["x", "abc"])); + let d = JoinHints::from_items(vec![s("abc"), s("zzz")]); + c.extend(&d); + assert_eq!( + c.len(), + 2, + "Single after Vector ending with same name is skipped on extend" + ); + assert_eq!(c.items()[0], v(&["x", "abc"])); + assert_eq!(c.items()[1], s("zzz")); + } + + #[test] + fn test_push_skips_redundant_single() { + let mut hints = JoinHints::new(); + hints.push(s("orders")); + hints.push(s("orders")); + assert_eq!(hints.len(), 1); + + hints.push(v(&["users", "orders"])); + hints.push(s("orders")); + assert_eq!( + hints.len(), + 2, + "Single after Vector ending with same name is skipped" + ); + + hints.push(s("abc")); + assert_eq!(hints.len(), 3, "Different Single is added"); + } + #[test] fn test_into_items_and_into_iter() { let hints = JoinHints::from_items(vec![s("b"), s("a"), v(&["x", "y"])]); let cloned = hints.clone(); - // into_iter let collected: Vec<_> = cloned.into_iter().collect(); assert_eq!(collected.len(), 3); - // into_items let items = hints.into_items(); assert_eq!(items.len(), 3); - assert_eq!(items[0], s("a")); - assert_eq!(items[1], s("b")); + assert_eq!(items[0], s("b")); + assert_eq!(items[1], s("a")); } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs index 90cccfb6104ea..72747cbfe84c0 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/mod.rs @@ -3,6 +3,7 @@ pub mod base_join_condition; pub mod base_query; pub mod filter; pub mod join_hints; +pub mod multi_fact_join_groups; pub mod time_dimension; pub mod params_allocator; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs new file mode 100644 index 0000000000000..3d6e3756b9296 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/multi_fact_join_groups.rs @@ -0,0 +1,433 @@ +use crate::cube_bridge::join_definition::JoinDefinition; +use crate::plan::FilterItem; +use crate::planner::join_hints::JoinHints; +use crate::planner::query_tools::JoinKey; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_evaluator::collectors::{collect_join_hints, has_multi_stage_members}; +use crate::planner::sql_evaluator::MemberSymbol; +use cubenativeutils::CubeError; +use itertools::Itertools; +use std::collections::HashMap; +use std::rc::Rc; + +#[derive(Clone, Debug)] +pub struct MeasureJoinHints { + pub measure: Rc, + pub hints: JoinHints, +} + +pub struct MeasuresJoinHintsBuilder { + initial_hints: JoinHints, + dimensions: Vec>, + filters: Vec, +} + +impl MeasuresJoinHintsBuilder { + pub fn add_dimensions(mut self, dims: &[Rc]) -> Self { + self.dimensions.extend(dims.iter().cloned()); + self + } + + pub fn add_filters(mut self, filters: &[FilterItem]) -> Self { + self.filters.extend(filters.iter().cloned()); + self + } + + pub fn build(self, measures: &[Rc]) -> Result { + let mut base_hints = self.initial_hints; + + for sym in self.dimensions.iter() { + base_hints.extend(&collect_join_hints(sym)?); + } + + let mut filter_symbols = Vec::new(); + for item in self.filters.iter() { + item.find_all_member_evaluators(&mut filter_symbols); + } + for sym in filter_symbols.iter() { + base_hints.extend(&collect_join_hints(sym)?); + } + + MeasuresJoinHints::from_base_hints(base_hints, measures) + } +} + +#[derive(Clone, Debug)] +pub struct MeasuresJoinHints { + base_hints: JoinHints, + measure_hints: Vec, +} + +impl MeasuresJoinHints { + pub fn empty() -> Self { + Self { + base_hints: JoinHints::new(), + measure_hints: vec![], + } + } + + pub fn builder(query_join_hints: &JoinHints) -> MeasuresJoinHintsBuilder { + MeasuresJoinHintsBuilder { + initial_hints: query_join_hints.clone(), + dimensions: Vec::new(), + filters: Vec::new(), + } + } + + pub fn for_measures(&self, measures: &[Rc]) -> Result { + Self::from_base_hints(self.base_hints.clone(), measures) + } + + fn from_base_hints( + base_hints: JoinHints, + measures: &[Rc], + ) -> Result { + let mut filtered_measures = Vec::new(); + for m in measures { + if !has_multi_stage_members(m, true)? { + filtered_measures.push(m.clone()); + } + } + + let measure_hints: Vec = filtered_measures + .iter() + .map(|m| -> Result<_, CubeError> { + let mut hints = base_hints.clone(); + hints.extend(&collect_join_hints(m)?); + Ok(MeasureJoinHints { + measure: m.clone(), + hints, + }) + }) + .collect::, _>>()?; + + Ok(Self { + base_hints, + measure_hints, + }) + } + + pub fn base_hints(&self) -> &JoinHints { + &self.base_hints + } + + pub fn measure_hints(&self) -> &[MeasureJoinHints] { + &self.measure_hints + } + + pub fn hints_for_measure(&self, measure: &MemberSymbol) -> Option { + self.measure_hints + .iter() + .find(|mh| mh.measure.full_name() == measure.full_name()) + .map(|mh| mh.hints.clone()) + } +} + +// --- MultiFactJoinGroups: builds actual join trees --- + +#[derive(Clone)] +pub struct MultiFactJoinGroups { + query_tools: Rc, + measures_join_hints: MeasuresJoinHints, + groups: Vec<(Rc, Vec>)>, + /// cube_name → join path from root, computed from the first group (shared for dimensions). + dimension_paths: HashMap>, + /// measure full_name → join path from root, computed per group. + measure_paths: HashMap>, +} + +impl MultiFactJoinGroups { + pub fn empty(query_tools: Rc) -> Self { + Self { + query_tools, + measures_join_hints: MeasuresJoinHints::empty(), + groups: vec![], + dimension_paths: HashMap::new(), + measure_paths: HashMap::new(), + } + } + + pub fn try_new( + query_tools: Rc, + measures_join_hints: MeasuresJoinHints, + ) -> Result { + let groups = Self::build_groups(&query_tools, &measures_join_hints)?; + let (dimension_paths, measure_paths) = Self::precompute_paths(&groups)?; + Ok(Self { + query_tools, + measures_join_hints, + groups, + dimension_paths, + measure_paths, + }) + } + + pub fn for_measures(&self, measures: &[Rc]) -> Result { + let new_hints = self.measures_join_hints.for_measures(measures)?; + Self::try_new(self.query_tools.clone(), new_hints) + } + + fn build_groups( + query_tools: &Rc, + hints: &MeasuresJoinHints, + ) -> Result, Vec>)>, CubeError> { + let measures_to_join = if hints.measure_hints.is_empty() { + if hints.base_hints.is_empty() { + vec![] + } else { + let (key, join) = query_tools.join_for_hints(&hints.base_hints)?; + vec![(Vec::new(), key, join)] + } + } else { + hints + .measure_hints + .iter() + .map(|mh| -> Result<_, CubeError> { + let (key, join) = query_tools.join_for_hints(&mh.hints)?; + Ok((vec![mh.measure.clone()], key, join)) + }) + .collect::, _>>()? + }; + + let mut key_order: Vec = Vec::new(); + let mut grouped: HashMap, Vec>)> = + HashMap::new(); + for (measures, key, join) in measures_to_join { + if let Some(entry) = grouped.get_mut(&key) { + entry.1.extend(measures); + } else { + key_order.push(key.clone()); + grouped.insert(key, (join, measures)); + } + } + + Ok(key_order + .into_iter() + .map(|key| grouped.remove(&key).unwrap()) + .collect()) + } + + pub fn measures_join_hints(&self) -> &MeasuresJoinHints { + &self.measures_join_hints + } + + pub fn is_multi_fact(&self) -> bool { + self.groups.len() > 1 + } + + pub fn groups(&self) -> &[(Rc, Vec>)] { + &self.groups + } + + pub fn num_groups(&self) -> usize { + self.groups.len() + } + + /// Returns the join path from root to the dimension's cube. + /// Precomputed from the first group (dimension paths are identical across all groups). + pub fn resolve_join_path_for_dimension( + &self, + dimension: &Rc, + ) -> Option<&Vec> { + self.dimension_paths + .get(&dimension.clone().resolve_reference_chain().cube_name()) + } + + /// Returns the join path from root to the measure's cube. + /// Precomputed per measure from its group's JoinDefinition. + pub fn resolve_join_path_for_measure( + &self, + measure: &Rc, + ) -> Option<&Vec> { + self.measure_paths + .get(&measure.clone().resolve_reference_chain().full_name()) + } + + fn precompute_paths( + groups: &[(Rc, Vec>)], + ) -> Result<(HashMap>, HashMap>), CubeError> { + let dimension_paths = if groups.is_empty() { + HashMap::new() + } else { + Self::build_cube_paths(&*groups[0].0)? + }; + + let mut measure_paths = HashMap::new(); + for (join, measures) in groups { + if measures.is_empty() { + continue; + } + let cube_paths = Self::build_cube_paths(&**join)?; + for m in measures { + if let Some(path) = cube_paths.get(&m.cube_name()) { + measure_paths.insert(m.full_name(), path.clone()); + } + } + } + + Ok((dimension_paths, measure_paths)) + } + + fn build_cube_paths( + join: &dyn JoinDefinition, + ) -> Result>, CubeError> { + let root = join.static_data().root.clone(); + let mut paths: HashMap> = HashMap::new(); + paths.insert(root.clone(), vec![root]); + + for join_item in join.joins()? { + let sd = join_item.static_data(); + let parent_path = paths + .get(&sd.original_from) + .cloned() + .unwrap_or_else(|| vec![sd.original_from.clone()]); + let mut path = parent_path; + path.push(sd.original_to.clone()); + paths.insert(sd.original_to.clone(), path); + } + + Ok(paths) + } + + pub fn single_join(&self) -> Result>, CubeError> { + if self.groups.is_empty() { + return Ok(None); + } + if self.groups.len() > 1 { + return Err(CubeError::internal(format!( + "Expected just one multi-fact join group for simple query but got multiple: {}", + self.groups + .iter() + .map(|(_, measures)| format!( + "({})", + measures.iter().map(|m| m.full_name()).join(", ") + )) + .join(", ") + ))); + } + Ok(Some(self.groups.first().unwrap().0.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_fixtures::cube_bridge::MockSchema; + use crate::test_fixtures::test_utils::TestContext; + + #[test] + fn test_single_fact_one_group() { + let schema = MockSchema::from_yaml_file("common/multi_fact.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let orders_count = ctx.create_symbol("orders.count").unwrap(); + let customers_name = ctx.create_symbol("customers.name").unwrap(); + + let hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(&[customers_name]) + .build(&[orders_count]) + .unwrap(); + + let groups = MultiFactJoinGroups::try_new(ctx.query_tools().clone(), hints).unwrap(); + + assert!(!groups.is_multi_fact()); + assert_eq!(groups.num_groups(), 1); + assert!(groups.single_join().unwrap().is_some()); + } + + #[test] + fn test_multi_fact_two_groups() { + let schema = MockSchema::from_yaml_file("common/multi_fact.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let orders_count = ctx.create_symbol("orders.count").unwrap(); + let returns_count = ctx.create_symbol("returns.count").unwrap(); + let customers_name = ctx.create_symbol("customers.name").unwrap(); + + let hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(&[customers_name]) + .build(&[orders_count, returns_count]) + .unwrap(); + + let groups = MultiFactJoinGroups::try_new(ctx.query_tools().clone(), hints).unwrap(); + + assert!(groups.is_multi_fact()); + assert_eq!(groups.num_groups(), 2); + assert!(groups.single_join().is_err()); + } + + #[test] + fn test_resolve_join_path_for_measure() { + let schema = MockSchema::from_yaml_file("common/multi_fact.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let orders_count = ctx.create_symbol("orders.count").unwrap(); + let customers_name = ctx.create_symbol("customers.name").unwrap(); + + let hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(&[customers_name]) + .build(std::slice::from_ref(&orders_count)) + .unwrap(); + + let groups = MultiFactJoinGroups::try_new(ctx.query_tools().clone(), hints).unwrap(); + + assert_eq!( + groups.resolve_join_path_for_measure(&orders_count), + Some(&vec!["customers".to_string(), "orders".to_string()]) + ); + } + + #[test] + fn test_resolve_join_path_for_dimension() { + let schema = MockSchema::from_yaml_file("common/multi_fact.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let orders_count = ctx.create_symbol("orders.count").unwrap(); + let customers_name = ctx.create_symbol("customers.name").unwrap(); + + let hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(std::slice::from_ref(&customers_name)) + .build(&[orders_count]) + .unwrap(); + + let groups = MultiFactJoinGroups::try_new(ctx.query_tools().clone(), hints).unwrap(); + + assert_eq!( + groups.resolve_join_path_for_dimension(&customers_name), + Some(&vec!["customers".to_string()]) + ); + } + + #[test] + fn test_resolve_join_paths_multi_fact() { + let schema = MockSchema::from_yaml_file("common/multi_fact.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let orders_count = ctx.create_symbol("orders.count").unwrap(); + let returns_count = ctx.create_symbol("returns.count").unwrap(); + let customers_name = ctx.create_symbol("customers.name").unwrap(); + + let hints = MeasuresJoinHints::builder(&JoinHints::new()) + .add_dimensions(std::slice::from_ref(&customers_name)) + .build(&[orders_count.clone(), returns_count.clone()]) + .unwrap(); + + let groups = MultiFactJoinGroups::try_new(ctx.query_tools().clone(), hints).unwrap(); + + assert_eq!( + groups.resolve_join_path_for_measure(&orders_count), + Some(&vec!["customers".to_string(), "orders".to_string()]) + ); + assert_eq!( + groups.resolve_join_path_for_measure(&returns_count), + Some(&vec!["customers".to_string(), "returns".to_string()]) + ); + assert_eq!( + groups.resolve_join_path_for_dimension(&customers_name), + Some(&vec!["customers".to_string()]) + ); + // Unknown measure + let unknown = ctx.create_symbol("customers.count").unwrap(); + assert!(groups.resolve_join_path_for_measure(&unknown).is_none()); + } +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs index 2c18af329f0dc..2b20f7f7d9c28 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/multiplied_measures_query_planner.rs @@ -53,7 +53,7 @@ impl MultipliedMeasuresQueryPlanner { .compute_join_multi_fact_groups_with_measures( &full_key_aggregate_measures.regular_measures, )?; - for (join, measures) in join_multi_fact_groups.iter() { + for (join, measures) in join_multi_fact_groups.groups() { let regular_subquery_logical_plan = self.regular_measures_subquery(measures, join.clone())?; regular_measure_subqueries.push(regular_subquery_logical_plan); @@ -73,19 +73,11 @@ impl MultipliedMeasuresQueryPlanner { let join_multi_fact_groups = self .query_properties .compute_join_multi_fact_groups_with_measures(&measures)?; - if join_multi_fact_groups.len() != 1 { - return Err(CubeError::internal( - format!( - "Expected just one multi-fact join group for aggregate measures but got multiple: {}", - join_multi_fact_groups.into_iter().map(|(_, measures)| format!("({})", measures.iter().map(|m| m.full_name()).join(", "))).join(", ") - ) - )); - } - let aggregate_subquery_logical_plan = self.aggregate_subquery_plan( - &cube_name, - &measures, - join_multi_fact_groups.into_iter().next().unwrap().0, - )?; + let join = join_multi_fact_groups.single_join()?.ok_or_else(|| { + CubeError::internal("No join groups returned for aggregate measures".to_string()) + })?; + let aggregate_subquery_logical_plan = + self.aggregate_subquery_plan(&cube_name, &measures, join)?; aggregate_multiplied_subqueries.push(aggregate_subquery_logical_plan); } if regular_measure_subqueries.is_empty() && aggregate_multiplied_subqueries.is_empty() { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs index 95891a4a3916d..ba78638d9ad1a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_properties.rs @@ -14,6 +14,7 @@ use crate::cube_bridge::base_query_options::BaseQueryOptions; use crate::cube_bridge::join_definition::JoinDefinition; use crate::cube_bridge::options_member::OptionsMember; use crate::plan::{Filter, FilterItem}; +use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; use crate::planner::sql_evaluator::collectors::{ collect_multiplied_measures, has_multi_stage_members, }; @@ -102,7 +103,7 @@ pub struct QueryProperties { query_tools: Rc, ignore_cumulative: bool, ungrouped: bool, - multi_fact_join_groups: Vec<(Rc, Vec>)>, + multi_fact_join_groups: MultiFactJoinGroups, pre_aggregation_query: bool, total_query: bool, query_join_hints: Rc, @@ -426,10 +427,10 @@ impl QueryProperties { order_by, row_limit, offset, + multi_fact_join_groups: MultiFactJoinGroups::empty(query_tools.clone()), query_tools, ignore_cumulative: false, ungrouped, - multi_fact_join_groups: vec![], pre_aggregation_query, total_query, query_join_hints, @@ -478,10 +479,10 @@ impl QueryProperties { order_by, row_limit, offset, + multi_fact_join_groups: MultiFactJoinGroups::empty(query_tools.clone()), query_tools, ignore_cumulative, ungrouped, - multi_fact_join_groups: vec![], pre_aggregation_query, total_query, query_join_hints, @@ -526,37 +527,25 @@ impl QueryProperties { apply_static_filter_to_symbol(&order_item.member_evaluator, &dimensions_filters)?; } - self.multi_fact_join_groups = Self::compute_join_multi_fact_groups( - self.query_join_hints.clone(), - self.query_tools.clone(), - &self.measures, - &self.dimensions, - &self.extract_dimensions_from_order(), - &self.time_dimensions, - &self.time_dimensions_filters, - &self.dimensions_filters, - &self.measures_filters, - &self.segments, - )?; + let measures_join_hints = MeasuresJoinHints::builder(&self.query_join_hints) + .add_dimensions(&self.dimensions) + .add_dimensions(&self.extract_dimensions_from_order()) + .add_dimensions(&self.time_dimensions) + .add_filters(&self.time_dimensions_filters) + .add_filters(&self.dimensions_filters) + .add_filters(&self.measures_filters) + .add_filters(&self.segments) + .build(&self.measures)?; + self.multi_fact_join_groups = + MultiFactJoinGroups::try_new(self.query_tools.clone(), measures_join_hints)?; Ok(()) } pub fn compute_join_multi_fact_groups_with_measures( &self, - measures: &Vec>, - ) -> Result, Vec>)>, CubeError> { - Self::compute_join_multi_fact_groups( - self.query_join_hints.clone(), - self.query_tools.clone(), - measures, - &self.dimensions, - &self.extract_dimensions_from_order(), - &self.time_dimensions, - &self.time_dimensions_filters, - &self.dimensions_filters, - &self.measures_filters, - &self.segments, - ) + measures: &[Rc], + ) -> Result { + self.multi_fact_join_groups.for_measures(measures) } pub fn is_total_query(&self) -> bool { @@ -576,130 +565,12 @@ impl QueryProperties { .collect() } - pub fn compute_join_multi_fact_groups( - query_join_hints: Rc, - query_tools: Rc, - measures: &Vec>, - dimensions: &Vec>, - order_dimensions: &Vec>, - time_dimensions: &Vec>, - time_dimensions_filters: &Vec, - dimensions_filters: &Vec, - measures_filters: &Vec, - segments: &Vec, - ) -> Result, Vec>)>, CubeError> { - let dimensions_join_hints = query_tools - .cached_data_mut() - .join_hints_for_member_symbol_vec(&dimensions)?; - let order_dimensions_join_hints = query_tools - .cached_data_mut() - .join_hints_for_member_symbol_vec(&order_dimensions)?; - let time_dimensions_join_hints = query_tools - .cached_data_mut() - .join_hints_for_member_symbol_vec(&time_dimensions)?; - let time_dimensions_filters_join_hints = query_tools - .cached_data_mut() - .join_hints_for_filter_item_vec(&time_dimensions_filters)?; - let dimensions_filters_join_hints = query_tools - .cached_data_mut() - .join_hints_for_filter_item_vec(&dimensions_filters)?; - let segments_join_hints = query_tools - .cached_data_mut() - .join_hints_for_filter_item_vec(&segments)?; - let measures_filters_join_hints = query_tools - .cached_data_mut() - .join_hints_for_filter_item_vec(&measures_filters)?; - - let mut dimension_and_filter_join_hints_concat = vec![query_join_hints]; - - dimension_and_filter_join_hints_concat.extend(dimensions_join_hints.into_iter()); - dimension_and_filter_join_hints_concat.extend(order_dimensions_join_hints.into_iter()); - dimension_and_filter_join_hints_concat.extend(time_dimensions_join_hints.into_iter()); - dimension_and_filter_join_hints_concat - .extend(time_dimensions_filters_join_hints.into_iter()); - dimension_and_filter_join_hints_concat.extend(dimensions_filters_join_hints.into_iter()); - dimension_and_filter_join_hints_concat.extend(segments_join_hints.into_iter()); - // TODO This is not quite correct. Decide on how to handle it. Keeping it here just to blow up on unsupported case - dimension_and_filter_join_hints_concat.extend(measures_filters_join_hints.into_iter()); - let dimension_and_filter_join_hints_concat = dimension_and_filter_join_hints_concat - .into_iter() - .filter(|v| !v.is_empty()) - .collect_vec(); - - let mut filtered_measures = Vec::new(); - for m in measures { - if !has_multi_stage_members(&m, true)? { - filtered_measures.push(m.clone()); - } - } - let measures = filtered_measures; - - let measures_to_join = if measures.is_empty() { - if dimension_and_filter_join_hints_concat.is_empty() { - vec![] - } else { - let join = query_tools - .cached_data_mut() - .join_by_hints(dimension_and_filter_join_hints_concat.clone(), |hints| { - query_tools.base_tools().join_tree_for_hints(hints) - })?; - vec![(Vec::new(), join)] - } - } else { - measures - .iter() - .map(|m| -> Result<_, CubeError> { - let measure_join_hints = - query_tools.cached_data_mut().join_hints_for_member(m)?; - let join = query_tools.cached_data_mut().join_by_hints( - vec![measure_join_hints] - .into_iter() - .chain(dimension_and_filter_join_hints_concat.clone().into_iter()) - .collect::>(), - |hints| query_tools.base_tools().join_tree_for_hints(hints), - )?; - Ok((vec![m.clone()], join)) - }) - .collect::, _>>()? - }; - let res = measures_to_join - .into_iter() - .into_group_map_by(|(_, (key, _))| key.clone()) - .into_values() - .map(|measures_and_join| { - ( - measures_and_join.first().unwrap().1 .1.clone(), - measures_and_join - .into_iter() - .flat_map(|m| m.0) - .collect::>(), - ) - }) - .collect_vec(); - Ok(res) - } - pub fn is_multi_fact_join(&self) -> bool { - self.multi_fact_join_groups.len() > 1 + self.multi_fact_join_groups.is_multi_fact() } pub fn simple_query_join(&self) -> Result>, CubeError> { - if self.multi_fact_join_groups.is_empty() { - return Ok(None); - } - if self.multi_fact_join_groups.len() > 1 { - return Err(CubeError::internal(format!( - "Expected just one multi-fact join group for simple query but got multiple: {}", - self.multi_fact_join_groups - .iter() - .map(|(_, measures)| format!( - "({})", - measures.iter().map(|m| m.full_name()).join(", ") - )) - .join(", ") - ))); - } - Ok(Some(self.multi_fact_join_groups.first().unwrap().0.clone())) + self.multi_fact_join_groups.single_join() } pub fn measures(&self) -> &Vec> { @@ -910,11 +781,9 @@ impl QueryProperties { result.multi_stage_measures.push(m.clone()) } else { let join = self - .compute_join_multi_fact_groups_with_measures(&vec![m.clone()])? - .first() - .expect("No join groups returned for single measure multi-fact join group") - .0 - .clone(); + .compute_join_multi_fact_groups_with_measures(std::slice::from_ref(m))? + .single_join()? + .expect("No join groups returned for single measure multi-fact join group"); for item in collect_multiplied_measures(m, join)? { if item.multiplied { result diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs index c2bd4e6299127..f79d0912b3f8a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs @@ -1,106 +1,27 @@ -use super::sql_evaluator::{Compiler, MemberSymbol}; +use super::sql_evaluator::Compiler; use super::ParamsAllocator; use crate::cube_bridge::base_tools::BaseTools; use crate::cube_bridge::evaluator::CubeEvaluator; use crate::cube_bridge::join_definition::JoinDefinition; use crate::cube_bridge::join_graph::JoinGraph; -use crate::cube_bridge::join_hints::JoinHintItem; use crate::cube_bridge::join_item::JoinItemStatic; use crate::cube_bridge::security_context::SecurityContext; use crate::cube_bridge::sql_templates_render::SqlTemplatesRender; -use crate::plan::FilterItem; use crate::planner::join_hints::JoinHints; -use crate::planner::sql_evaluator::collectors::collect_join_hints; use crate::planner::sql_templates::PlanSqlTemplates; use chrono_tz::Tz; use cubenativeutils::CubeError; use itertools::Itertools; -use std::cell::{Ref, RefCell, RefMut}; -use std::collections::{HashMap, HashSet}; +use std::cell::RefCell; +use std::collections::HashSet; use std::rc::Rc; -pub struct QueryToolsCachedData { - join_hints: HashMap>, - join_hints_to_join_key: HashMap>, Rc>, - join_key_to_join: HashMap, Rc>, -} - -#[derive(Debug, Hash, PartialEq, Eq)] +#[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct JoinKey { root: String, joins: Vec, } -impl QueryToolsCachedData { - pub fn new() -> Self { - Self { - join_hints: HashMap::new(), - join_hints_to_join_key: HashMap::new(), - join_key_to_join: HashMap::new(), - } - } - - pub fn join_hints_for_member( - &mut self, - node: &Rc, - ) -> Result, CubeError> { - let full_name = node.full_name(); - if let Some(val) = self.join_hints.get(&full_name) { - Ok(val.clone()) - } else { - let join_hints = Rc::new(collect_join_hints(node)?); - self.join_hints.insert(full_name, join_hints.clone()); - Ok(join_hints) - } - } - - pub fn join_hints_for_member_symbol_vec( - &mut self, - vec: &Vec>, - ) -> Result>, CubeError> { - vec.iter() - .map(|b| self.join_hints_for_member(b)) - .collect::, _>>() - } - - pub fn join_hints_for_filter_item_vec( - &mut self, - vec: &Vec, - ) -> Result>, CubeError> { - let mut member_symbols = Vec::new(); - for i in vec.iter() { - i.find_all_member_evaluators(&mut member_symbols); - } - member_symbols - .iter() - .map(|b| self.join_hints_for_member(b)) - .collect::, _>>() - } - - pub fn join_by_hints( - &mut self, - hints: Vec>, - join_fn: impl FnOnce(Vec) -> Result, CubeError>, - ) -> Result<(Rc, Rc), CubeError> { - if let Some(key) = self.join_hints_to_join_key.get(&hints) { - Ok((key.clone(), self.join_key_to_join.get(key).unwrap().clone())) - } else { - let join = join_fn(hints.iter().flat_map(|h| h.iter().cloned()).collect())?; - let join_key = Rc::new(JoinKey { - root: join.static_data().root.to_string(), - joins: join - .joins()? - .iter() - .map(|i| i.static_data().clone()) - .collect(), - }); - self.join_hints_to_join_key.insert(hints, join_key.clone()); - self.join_key_to_join.insert(join_key.clone(), join.clone()); - Ok((join_key, join)) - } - } -} - pub struct QueryTools { cube_evaluator: Rc, base_tools: Rc, @@ -108,7 +29,6 @@ pub struct QueryTools { templates_render: Rc, params_allocator: Rc>, evaluator_compiler: Rc>, - cached_data: RefCell, timezone: Tz, masked_members: HashSet, } @@ -144,7 +64,6 @@ impl QueryTools { templates_render, params_allocator: Rc::new(RefCell::new(ParamsAllocator::new(export_annotated_sql))), evaluator_compiler, - cached_data: RefCell::new(QueryToolsCachedData::new()), timezone, masked_members: masked_members.unwrap_or_default().into_iter().collect(), })) @@ -175,12 +94,22 @@ impl QueryTools { self.timezone } - pub fn cached_data(&self) -> Ref<'_, QueryToolsCachedData> { - self.cached_data.borrow() - } - - pub fn cached_data_mut(&self) -> RefMut<'_, QueryToolsCachedData> { - self.cached_data.borrow_mut() + pub fn join_for_hints( + &self, + hints: &JoinHints, + ) -> Result<(JoinKey, Rc), CubeError> { + let join = self + .base_tools + .join_tree_for_hints(hints.items().to_vec())?; + let join_key = JoinKey { + root: join.static_data().root.to_string(), + joins: join + .joins()? + .iter() + .map(|i| i.static_data().clone()) + .collect(), + }; + Ok((join_key, join)) } pub fn evaluator_compiler(&self) -> &Rc> { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs index 5c25f368d441f..7988495b24d81 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs @@ -18,19 +18,12 @@ use cubenativeutils::CubeError; use std::collections::HashMap; use std::rc::Rc; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -enum CacheSymbolType { - Dimension, - Measure, - Segment, -} - pub struct Compiler { cube_evaluator: Rc, base_tools: Rc, security_context: Rc, timezone: Tz, - members: HashMap<(CacheSymbolType, String), Rc>, + members: HashMap>, cube_names: HashMap, Rc>, cube_tables: HashMap, Rc>, } @@ -81,12 +74,12 @@ impl Compiler { &mut self, path: SymbolPath, ) -> Result, CubeError> { - if let Some(exists) = self.exists_member(CacheSymbolType::Measure, &path.cache_name()) { + if let Some(exists) = self.members.get(&path) { Ok(exists.clone()) } else { - let result = - MeasureSymbolFactory::try_new(path, self.cube_evaluator.clone())?.build(self)?; - self.validate_and_cache_result(CacheSymbolType::Measure, result.clone())?; + let result = MeasureSymbolFactory::try_new(path.clone(), self.cube_evaluator.clone())? + .build(self)?; + self.validate_and_cache_result(path, result.clone())?; Ok(result) } } @@ -110,12 +103,13 @@ impl Compiler { &mut self, path: SymbolPath, ) -> Result, CubeError> { - if let Some(exists) = self.exists_member(CacheSymbolType::Dimension, &path.cache_name()) { + if let Some(exists) = self.members.get(&path) { Ok(exists.clone()) } else { let result = - DimensionSymbolFactory::try_new(path, self.cube_evaluator.clone())?.build(self)?; - self.validate_and_cache_result(CacheSymbolType::Dimension, result.clone())?; + DimensionSymbolFactory::try_new(path.clone(), self.cube_evaluator.clone())? + .build(self)?; + self.validate_and_cache_result(path, result.clone())?; Ok(result) } } @@ -129,11 +123,11 @@ impl Compiler { &mut self, path: SymbolPath, ) -> Result, CubeError> { - let full_name = path.full_name().clone(); - if let Some(exists) = self.exists_member(CacheSymbolType::Segment, &full_name) { + if let Some(exists) = self.members.get(&path) { return Ok(exists.clone()); } - let definition = self.cube_evaluator.segment_by_path(full_name.clone())?; + let full_name = path.full_name().clone(); + let definition = self.cube_evaluator.segment_by_path(full_name)?; let sql_call = self.compile_sql_call(path.cube_name(), definition.sql()?)?; let alias = PlanSqlTemplates::member_alias_name(path.cube_name(), path.symbol_name(), &None); @@ -147,8 +141,7 @@ impl Compiler { path.path().clone(), )?; let result = MemberSymbol::new_member_expression(symbol); - let key = (CacheSymbolType::Segment, full_name); - self.members.insert(key, result.clone()); + self.members.insert(path, result.clone()); Ok(result) } @@ -213,23 +206,13 @@ impl Compiler { Ok(Rc::new(sql_call)) } - fn exists_member( - &self, - symbol_type: CacheSymbolType, - full_name: &String, - ) -> Option> { - let key = (symbol_type, full_name.clone()); - self.members.get(&key).cloned() - } - fn validate_and_cache_result( &mut self, - symbol_type: CacheSymbolType, + path: SymbolPath, node: Rc, ) -> Result<(), CubeError> { node.validate()?; - let key = (symbol_type, node.full_name().clone()); - self.members.insert(key, node.clone()); + self.members.insert(path, node); Ok(()) } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/common/symbol_path.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/common/symbol_path.rs index 0a595851443cc..29f28fb0d56bc 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/common/symbol_path.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/common/symbol_path.rs @@ -1,10 +1,11 @@ +use std::hash::{Hash, Hasher}; use std::rc::Rc; use cubenativeutils::CubeError; use crate::cube_bridge::evaluator::CubeEvaluator; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum SymbolPathType { Dimension, Measure, @@ -216,6 +217,26 @@ impl SymbolPath { } } +impl PartialEq for SymbolPath { + fn eq(&self, other: &Self) -> bool { + self.path_type == other.path_type + && self.full_name == other.full_name + && self.granularity == other.granularity + && self.path == other.path + } +} + +impl Eq for SymbolPath {} + +impl Hash for SymbolPath { + fn hash(&self, state: &mut H) { + self.path_type.hash(state); + self.full_name.hash(state); + self.granularity.hash(state); + self.path.hash(state); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_schema.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_schema.rs index 9702c8213ac4f..440c452209f90 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_schema.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/mock_schema.rs @@ -416,6 +416,7 @@ impl MockCubeBuilder { pub struct ViewCube { pub join_path: String, pub includes: Vec, + pub prefix: bool, } pub struct MockViewBuilder { @@ -432,6 +433,21 @@ impl MockViewBuilder { self.view_cubes.push(ViewCube { join_path: join_path.into(), includes, + prefix: false, + }); + self + } + + pub fn include_cube_with_prefix( + mut self, + join_path: impl Into, + includes: Vec, + prefix: bool, + ) -> Self { + self.view_cubes.push(ViewCube { + join_path: join_path.into(), + includes, + prefix, }); self } @@ -484,20 +500,29 @@ impl MockViewBuilder { view_cube.includes.clone() }; + let make_view_name = |member_name: &str| -> String { + if view_cube.prefix { + format!("{}_{}", target_cube_name, member_name) + } else { + member_name.to_string() + } + }; + for member_name in &members_to_include { if let Some(dimension) = source_cube.dimensions.get(member_name) { let view_member_sql = format!("{{{}.{}}}", view_cube.join_path, member_name); + let view_name = make_view_name(member_name); - if all_dimensions.contains_key(member_name) { + if all_dimensions.contains_key(&view_name) { panic!( "Duplicate member '{}' in view '{}'. Members must be unique.", - member_name, self.view_name + view_name, self.view_name ); } all_dimensions.insert( - member_name.clone(), + view_name, Rc::new( MockDimensionDefinition::builder() .dimension_type(dimension.static_data().dimension_type.clone()) @@ -512,16 +537,17 @@ impl MockViewBuilder { if let Some(measure) = source_cube.measures.get(member_name) { let view_member_sql = format!("{{{}.{}}}", view_cube.join_path, member_name); + let view_name = make_view_name(member_name); - if all_measures.contains_key(member_name) { + if all_measures.contains_key(&view_name) { panic!( "Duplicate member '{}' in view '{}'. Members must be unique.", - member_name, self.view_name + view_name, self.view_name ); } all_measures.insert( - member_name.clone(), + view_name, Rc::new( MockMeasureDefinition::builder() .measure_type(measure.static_data().measure_type.clone()) @@ -536,16 +562,17 @@ impl MockViewBuilder { if source_cube.segments.contains_key(member_name) { let view_member_sql = format!("{{{}.{}}}", view_cube.join_path, member_name); + let view_name = make_view_name(member_name); - if all_segments.contains_key(member_name) { + if all_segments.contains_key(&view_name) { panic!( "Duplicate member '{}' in view '{}'. Members must be unique.", - member_name, self.view_name + view_name, self.view_name ); } all_segments.insert( - member_name.clone(), + view_name, Rc::new( MockSegmentDefinition::builder() .sql(view_member_sql) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs index 843813f6a820f..880c70ec9dde6 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs @@ -36,6 +36,8 @@ pub struct YamlBaseQueryOptions { pub disable_external_pre_aggregations: Option, #[serde(default)] pub pre_aggregation_id: Option, + #[serde(default, rename = "joinHints")] + pub join_hints: Option>>, } #[derive(Debug, Deserialize)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/schema.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/schema.rs index 047a3ae6f732d..43173a7eebd7d 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/schema.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/schema.rs @@ -86,6 +86,8 @@ struct YamlViewCube { join_path: String, #[serde(default)] includes: Option, + #[serde(default)] + prefix: Option, } impl YamlSchema { @@ -156,7 +158,9 @@ impl YamlSchema { Some(YamlIncludes::List(list)) => list, _ => vec![], }; - view_builder = view_builder.include_cube(view_cube.join_path, includes); + let prefix = view_cube.prefix.unwrap_or(false); + view_builder = + view_builder.include_cube_with_prefix(view_cube.join_path, includes, prefix); } builder = view_builder.finish_view(); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_fact.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_fact.yaml new file mode 100644 index 0000000000000..d8253b91e6d1d --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_fact.yaml @@ -0,0 +1,73 @@ +cubes: + - name: customers + sql: "SELECT * FROM customers" + joins: + - name: orders + relationship: one_to_many + sql: "{customers}.id = {orders.customer_id}" + - name: returns + relationship: one_to_many + sql: "{customers}.id = {returns.customer_id}" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: name + type: string + sql: name + - name: city + type: string + sql: city + measures: + - name: count + type: count + sql: "COUNT(*)" + + - name: orders + sql: "SELECT * FROM orders" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: customer_id + type: number + sql: customer_id + - name: status + type: string + sql: status + - name: created_at + type: time + sql: created_at + measures: + - name: count + type: count + sql: "COUNT(*)" + - name: total_amount + type: sum + sql: amount + + - name: returns + sql: "SELECT * FROM returns" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: customer_id + type: number + sql: customer_id + - name: reason + type: string + sql: reason + - name: created_at + type: time + sql: created_at + measures: + - name: count + type: count + sql: "COUNT(*)" + - name: total_refund + type: sum + sql: refund_amount diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multiple_join_paths.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multiple_join_paths.yaml new file mode 100644 index 0000000000000..1733ecb37c35b --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multiple_join_paths.yaml @@ -0,0 +1,93 @@ +cubes: + - name: A + sql: "SELECT 1 AS a_id" + joins: + - name: B + relationship: many_to_one + sql: "'A' = 'B'" + - name: D + relationship: many_to_one + sql: "'A' = 'D'" + - name: F + relationship: many_to_one + sql: "'A' = 'F'" + dimensions: + - name: a_id + type: number + sql: a_id + primary_key: true + measures: + - name: count + type: count + sql: "COUNT(*)" + + - name: B + sql: "SELECT 1 AS b_id" + joins: + - name: C + relationship: many_to_one + sql: "'B' = 'C'" + dimensions: + - name: b_id + type: number + sql: b_id + primary_key: true + + - name: C + sql: "SELECT 1 AS c_id" + joins: + - name: X + relationship: many_to_one + sql: "'C' = 'X'" + dimensions: + - name: c_id + type: number + sql: c_id + primary_key: true + + - name: D + sql: "SELECT 1 AS d_id" + joins: + - name: E + relationship: many_to_one + sql: "'D' = 'E'" + dimensions: + - name: d_id + type: number + sql: d_id + primary_key: true + + - name: E + sql: "SELECT 1 AS e_id" + joins: + - name: X + relationship: many_to_one + sql: "'E' = 'X'" + dimensions: + - name: e_id + type: number + sql: e_id + primary_key: true + + - name: F + sql: "SELECT 1 AS f_id" + joins: + - name: X + relationship: many_to_one + sql: "'F' = 'X'" + dimensions: + - name: f_id + type: number + sql: f_id + primary_key: true + + - name: X + sql: "SELECT 1 AS x_id, 'foo' AS x_name" + dimensions: + - name: x_id + type: number + sql: x_id + primary_key: true + - name: x_name + type: string + sql: x_name diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/rollup_join_calculated_measures.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/rollup_join_calculated_measures.yaml new file mode 100644 index 0000000000000..40da0da3dcfbd --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/rollup_join_calculated_measures.yaml @@ -0,0 +1,115 @@ +cubes: + - name: facts + sql: "SELECT * FROM visitor_checkins" + + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: line_item_id + type: number + sql: visitor_id + - name: day + type: time + sql: created_at + measures: + - name: count + type: count + sql: "COUNT(*)" + - name: total_cost + type: sum + sql: id + - name: avg_cost + type: number + sql: "{CUBE.total_cost} / {CUBE.count}" + pre_aggregations: + - name: facts_rollup + type: rollup + measures: + - count + - total_cost + - avg_cost + dimensions: + - line_item_id + time_dimension: day + granularity: day + + - name: line_items + sql: "SELECT * FROM visitors" + + joins: + - name: facts + relationship: one_to_many + sql: "{CUBE.id} = {facts.line_item_id}" + - name: campaigns + relationship: many_to_one + sql: "{CUBE.id} = {campaigns.id}" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: name + type: string + sql: source + measures: + - name: count + type: count + sql: "COUNT(*)" + pre_aggregations: + - name: li_rollup + type: rollup + dimensions: + - id + - name + - name: combined_rollup_join + type: rollupJoin + measures: + - facts.count + - facts.total_cost + - facts.avg_cost + dimensions: + - name + - campaigns.campaign_name + time_dimension: facts.day + granularity: day + rollups: + - campaigns.campaigns_rollup + - facts.facts_rollup + - line_items.li_rollup + + - name: campaigns + sql: "SELECT 1 as id, 'camp1' as campaign_name" + + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: campaign_name + type: string + sql: campaign_name + measures: + - name: count + type: count + sql: "COUNT(*)" + pre_aggregations: + - name: campaigns_rollup + type: rollup + dimensions: + - id + - campaign_name + +views: + - name: my_view + cubes: + - join_path: line_items.facts + includes: "*" + prefix: true + - join_path: line_items + includes: "*" + prefix: true + - join_path: line_items.campaigns + includes: "*" + prefix: true diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs index 2df7704643faa..523133718b9da 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs @@ -1,4 +1,5 @@ use crate::cube_bridge::base_query_options::BaseQueryOptions; +use crate::cube_bridge::join_hints::JoinHintItem; use crate::logical_plan::PreAggregation; use crate::planner::filter::base_segment::BaseSegment; use crate::planner::query_tools::QueryTools; @@ -239,6 +240,19 @@ impl TestContext { }) .filter(|td| !td.is_empty()); + let join_hints = yaml_options.join_hints.map(|hints| { + hints + .into_iter() + .map(|path| { + if path.len() == 1 { + JoinHintItem::Single(path.into_iter().next().unwrap()) + } else { + JoinHintItem::Vector(path) + } + }) + .collect::>() + }); + Rc::new( MockBaseQueryOptions::builder() .cube_evaluator(self.query_tools.cube_evaluator().clone()) @@ -251,6 +265,7 @@ impl TestContext { .time_dimensions(time_dimensions) .order(order) .filters(filters) + .join_hints(join_hints) .limit(yaml_options.limit) .row_limit(yaml_options.row_limit) .offset(yaml_options.offset) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs index 77efa96d8421c..ce038685e2713 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs @@ -184,6 +184,26 @@ fn test_segment_as_dimension_in_pre_aggregation_query() { insta::assert_snapshot!(sql); } +#[test] +fn test_multi_fact_two_measures_from_different_cubes() { + let schema = MockSchema::from_yaml_file("common/multi_fact.yaml"); + let test_context = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + measures: + - orders.count + - returns.count + dimensions: + - customers.name + "}; + + let sql = test_context + .build_sql(query_yaml) + .expect("Should generate SQL for multi-fact query"); + + insta::assert_snapshot!(sql); +} + #[test] fn test_measure_switch_cross_join() { let schema = MockSchema::from_yaml_file("common/calc_groups.yaml"); @@ -216,3 +236,48 @@ fn test_measure_switch_cross_join() { insta::assert_snapshot!(sql); } + +#[test] +fn test_query_level_join_hints() { + let schema = MockSchema::from_yaml_file("common/multiple_join_paths.yaml"); + let test_context = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + dimensions: + - A.a_id + - X.x_name + joinHints: + - [A, D] + - [D, E] + - [E, X] + "}; + + let sql = test_context + .build_sql(query_yaml) + .expect("Should generate SQL with join hints"); + + assert!( + sql.contains("'A' = 'D'"), + "SQL should use A->D join, got: {sql}" + ); + assert!( + sql.contains("'D' = 'E'"), + "SQL should use D->E join, got: {sql}" + ); + assert!( + sql.contains("'E' = 'X'"), + "SQL should use E->X join, got: {sql}" + ); + assert!( + !sql.contains("'A' = 'B'"), + "SQL should NOT use A->B join, got: {sql}" + ); + assert!( + !sql.contains("'B' = 'C'"), + "SQL should NOT use B->C join, got: {sql}" + ); + assert!( + !sql.contains("'A' = 'F'"), + "SQL should NOT use A->F join, got: {sql}" + ); +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs index f0d2e327dfe74..1f3dbe4ecd08b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs @@ -651,3 +651,49 @@ fn test_segment_with_coarser_granularity() { insta::assert_snapshot!(sql); } + +// --- rollupJoin with calculated measures through view --- + +#[test] +fn test_rollup_join_calculated_measures_through_view() { + let schema = MockSchema::from_yaml_file("common/rollup_join_calculated_measures.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let (sql, pre_aggrs) = ctx + .build_sql_with_used_pre_aggregations(indoc! {" + measures: + - my_view.facts_avg_cost + time_dimensions: + - dimension: my_view.facts_day + granularity: day + "}) + .unwrap(); + + let pre_agg_names: Vec<_> = pre_aggrs + .iter() + .map(|pa| format!("{}.{}", pa.cube_name(), pa.name())) + .collect(); + assert!( + pre_agg_names + .iter() + .any(|n| n == "line_items.combined_rollup_join"), + "Should use combined_rollup_join, got: {:?}\nSQL:\n{}", + pre_agg_names, + sql + ); + assert!( + sql.contains("campaigns_rollup"), + "SQL should reference campaigns_rollup, got:\n{}", + sql + ); + assert!( + sql.contains("facts_rollup"), + "SQL should reference facts_rollup, got:\n{}", + sql + ); + assert!( + sql.contains("li_rollup"), + "SQL should reference li_rollup, got:\n{}", + sql + ); +} diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__common_sql_generation__multi_fact_two_measures_from_different_cubes.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__common_sql_generation__multi_fact_two_measures_from_different_cubes.snap new file mode 100644 index 0000000000000..abd6929f82b80 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__common_sql_generation__multi_fact_two_measures_from_different_cubes.snap @@ -0,0 +1,24 @@ +--- +source: cubesqlplanner/src/tests/common_sql_generation.rs +expression: sql +--- +SELECT "fk_aggregate_keys"."customers__name" "customers__name", "q_0"."orders__count" "orders__count", "q_1"."returns__count" "returns__count" +FROM (SELECT DISTINCT "customers__name" "customers__name" +FROM (SELECT "customers".name "customers__name" +FROM customers AS "customers" +LEFT JOIN orders AS "orders" ON "customers".id = "orders".customer_id +GROUP BY 1 + UNION ALL +SELECT "customers".name "customers__name" +FROM customers AS "customers" +LEFT JOIN returns AS "returns" ON "customers".id = "returns".customer_id +GROUP BY 1) AS "pk_aggregate_keys_source") AS "fk_aggregate_keys" +LEFT JOIN (SELECT "customers".name "customers__name", count(COUNT(*)) "orders__count" +FROM customers AS "customers" +LEFT JOIN orders AS "orders" ON "customers".id = "orders".customer_id +GROUP BY 1) AS "q_0" ON (("fk_aggregate_keys"."customers__name" = "q_0"."customers__name" OR (("fk_aggregate_keys"."customers__name" IS NULL) AND ("q_0"."customers__name" IS NULL)))) +LEFT JOIN (SELECT "customers".name "customers__name", count(COUNT(*)) "returns__count" +FROM customers AS "customers" +LEFT JOIN returns AS "returns" ON "customers".id = "returns".customer_id +GROUP BY 1) AS "q_1" ON (("fk_aggregate_keys"."customers__name" = "q_1"."customers__name" OR (("fk_aggregate_keys"."customers__name" IS NULL) AND ("q_1"."customers__name" IS NULL)))) +ORDER BY 2 DESC From d5f635105de2e4e881f5ec8b6a073cb62b031295 Mon Sep 17 00:00:00 2001 From: waralexrom <108349432+waralexrom@users.noreply.github.com> Date: Tue, 17 Mar 2026 16:53:48 +0100 Subject: [PATCH 4/7] fix(tesseract): Tesseract should respect sql api aliases (#10505) --- .../src/adapter/BaseQuery.js | 1 + .../postgres/sql-generation.test.ts | 48 +++++++++++++++++++ .../src/cube_bridge/base_query_options.rs | 3 ++ .../cubesqlplanner/src/planner/base_query.rs | 1 + .../cubesqlplanner/src/planner/query_tools.rs | 4 +- .../src/planner/sql_evaluator/compiler.rs | 16 +++++-- .../sql_evaluator/symbols/dimension_symbol.rs | 14 ++++-- .../sql_evaluator/symbols/measure_symbol.rs | 14 ++++-- .../cube_bridge/base_query_options.rs | 6 ++- .../cube_bridge/yaml/base_query_options.rs | 7 +++ .../src/test_fixtures/schemas/mod.rs | 2 +- .../test_fixtures/test_utils/test_context.rs | 41 +++++++++++++--- .../src/tests/common_sql_generation.rs | 37 ++++++++++++++ 13 files changed, 171 insertions(+), 23 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 7a5b16fa766b1..f1137a4d607f0 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -952,6 +952,7 @@ export class BaseQuery { cubestoreSupportMultistage: this.options.cubestoreSupportMultistage ?? getEnv('cubeStoreRollingWindowJoin'), disableExternalPreAggregations: !!this.options.disableExternalPreAggregations, maskedMembers: this.options.maskedMembers, + memberToAlias: this.options.memberToAlias, }; try { diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts index c65bbe81f7d62..010bc1ae59b9d 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/sql-generation.test.ts @@ -1010,6 +1010,54 @@ SELECT 1 AS revenue, cast('2024-01-01' AS timestamp) as time UNION ALL visitors__str_case: 'More than 1' }])); + it('simple join member to alias', async () => runQueryTest({ + measures: [ + 'visitors.visitor_revenue', + 'visitors.visitor_count', + 'visitors.per_visitor_revenue' + ], + dimensions: [ + 'visitors.source' + ], + timeDimensions: [{ + dimension: 'visitors.created_at', + dateRange: ['2017-01-01', '2017-01-30'], + granularity: 'month' + }], + timezone: 'America/Los_Angeles', + memberToAlias: { + 'visitors.visitor_revenue': 'custom_revenue', + 'visitors.visitor_count': 'custom_count', + 'visitors.source': 'custom_source', + 'visitors.created_at': 'custom_created_at', + }, + order: [] + }, + + [ + { + custom_source: 'google', + custom_created_at_month: '2017-01-01T00:00:00.000Z', + custom_revenue: null, + custom_count: '1', + visitors__per_visitor_revenue: null + }, + { + custom_source: 'some', + custom_created_at_month: '2017-01-01T00:00:00.000Z', + custom_revenue: '300', + custom_count: '2', + visitors__per_visitor_revenue: '150' + }, + { + custom_source: null, + custom_created_at_month: '2017-01-01T00:00:00.000Z', + custom_revenue: null, + custom_count: '2', + visitors__per_visitor_revenue: null + } + ])); + it('running total', async () => { await compiler.compile(); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs index 3196071960449..1d58958f32e20 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/cube_bridge/base_query_options.rs @@ -11,6 +11,7 @@ use cubenativeutils::wrappers::{NativeArray, NativeContextHolder, NativeObjectHa use cubenativeutils::CubeError; use serde::{Deserialize, Serialize}; use std::any::Any; +use std::collections::HashMap; use std::rc::Rc; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -75,6 +76,8 @@ pub struct BaseQueryOptionsStatic { pub pre_aggregation_id: Option, #[serde(rename = "maskedMembers")] pub masked_members: Option>, + #[serde(rename = "memberToAlias", default)] + pub member_to_alias: Option>, } #[nativebridge::native_bridge(BaseQueryOptionsStatic)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 3caadc6dbe3af..27651cc87a0ef 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -35,6 +35,7 @@ impl BaseQuery { options.static_data().timezone.clone(), options.static_data().export_annotated_sql, options.static_data().masked_members.clone(), + options.static_data().member_to_alias.clone(), )?; let request = QueryProperties::try_new(query_tools.clone(), options)?; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs index f79d0912b3f8a..74fc677a7129f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/query_tools.rs @@ -13,7 +13,7 @@ use chrono_tz::Tz; use cubenativeutils::CubeError; use itertools::Itertools; use std::cell::RefCell; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::rc::Rc; #[derive(Debug, Hash, PartialEq, Eq, Clone)] @@ -42,6 +42,7 @@ impl QueryTools { timezone_name: Option, export_annotated_sql: bool, masked_members: Option>, + member_to_alias: Option>, ) -> Result, CubeError> { let templates_render = base_tools.sql_templates()?; let timezone = if let Some(timezone) = timezone_name { @@ -56,6 +57,7 @@ impl QueryTools { base_tools.clone(), security_context.clone(), timezone.clone(), + member_to_alias, ))); Ok(Rc::new(Self { cube_evaluator, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs index 7988495b24d81..38ffd5dde9619 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/compiler.rs @@ -23,6 +23,7 @@ pub struct Compiler { base_tools: Rc, security_context: Rc, timezone: Tz, + member_to_alias: Option>, members: HashMap>, cube_names: HashMap, Rc>, cube_tables: HashMap, Rc>, @@ -34,12 +35,14 @@ impl Compiler { base_tools: Rc, security_context: Rc, timezone: Tz, + member_to_alias: Option>, ) -> Self { Self { cube_evaluator, security_context, base_tools, timezone, + member_to_alias, members: HashMap::new(), cube_names: HashMap::new(), cube_tables: HashMap::new(), @@ -127,10 +130,11 @@ impl Compiler { return Ok(exists.clone()); } let full_name = path.full_name().clone(); - let definition = self.cube_evaluator.segment_by_path(full_name)?; + let definition = self.cube_evaluator.segment_by_path(full_name.clone())?; let sql_call = self.compile_sql_call(path.cube_name(), definition.sql()?)?; - let alias = - PlanSqlTemplates::member_alias_name(path.cube_name(), path.symbol_name(), &None); + let alias = self.alias_for_member(&full_name).unwrap_or_else(|| { + PlanSqlTemplates::member_alias_name(path.cube_name(), path.symbol_name(), &None) + }); let cube_symbol = self.add_cube_table_evaluator(path.cube_name().clone(), vec![])?; let symbol = MemberExpressionSymbol::try_new( cube_symbol, @@ -191,6 +195,12 @@ impl Compiler { self.timezone.clone() } + pub fn alias_for_member(&self, full_name: &str) -> Option { + self.member_to_alias + .as_ref() + .and_then(|m| m.get(full_name).cloned()) + } + pub fn compile_sql_call( &mut self, cube_name: &String, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/dimension_symbol.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/dimension_symbol.rs index 1cfc80bb8960f..56d71dcf7d6e3 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/dimension_symbol.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/dimension_symbol.rs @@ -387,11 +387,15 @@ impl SymbolFactory for DimensionSymbolFactory { }; let cube = cube_evaluator.cube_from_path(path.cube_name().clone())?; - let alias = PlanSqlTemplates::member_alias_name( - cube.static_data().resolved_alias(), - path.symbol_name(), - &None, - ); + let alias = compiler + .alias_for_member(path.full_name()) + .unwrap_or_else(|| { + PlanSqlTemplates::member_alias_name( + cube.static_data().resolved_alias(), + path.symbol_name(), + &None, + ) + }); let is_view = cube.static_data().is_view.unwrap_or(false); let is_calendar = cube.static_data().is_calendar.unwrap_or(false); let mut is_self_time_shift_pk = false; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs index 8b6632fa0c56b..9a50f3f3dd561 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/sql_evaluator/symbols/measure_symbol.rs @@ -710,11 +710,15 @@ impl SymbolFactory for MeasureSymbolFactory { }; let cube = cube_evaluator.cube_from_path(path.cube_name().clone())?; - let alias = PlanSqlTemplates::member_alias_name( - cube.static_data().resolved_alias(), - path.symbol_name(), - &None, - ); + let alias = compiler + .alias_for_member(path.full_name()) + .unwrap_or_else(|| { + PlanSqlTemplates::member_alias_name( + cube.static_data().resolved_alias(), + path.symbol_name(), + &None, + ) + }); let is_view = cube.static_data().is_view.unwrap_or(false); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/base_query_options.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/base_query_options.rs index eb30dbddf728f..0443af98aef5b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/base_query_options.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/base_query_options.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::collections::HashMap; use std::rc::Rc; use cubenativeutils::CubeError; @@ -69,6 +70,8 @@ pub struct MockBaseQueryOptions { pre_aggregation_id: Option, #[builder(default)] masked_members: Option>, + #[builder(default)] + member_to_alias: Option>, } impl_static_data!( @@ -88,7 +91,8 @@ impl_static_data!( cubestore_support_multistage, disable_external_pre_aggregations, pre_aggregation_id, - masked_members + masked_members, + member_to_alias ); pub fn members_from_strings(strings: Vec) -> Vec { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs index 880c70ec9dde6..89e64a3c427c9 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/cube_bridge/yaml/base_query_options.rs @@ -1,6 +1,7 @@ use crate::cube_bridge::base_query_options::{FilterItem, OrderByItem, TimeDimension}; use serde::de; use serde::{Deserialize, Deserializer}; +use std::collections::HashMap; #[derive(Debug, Deserialize)] pub struct YamlBaseQueryOptions { @@ -38,6 +39,12 @@ pub struct YamlBaseQueryOptions { pub pre_aggregation_id: Option, #[serde(default, rename = "joinHints")] pub join_hints: Option>>, + #[serde(default, rename = "memberToAlias")] + pub member_to_alias: Option>, + #[serde(default)] + pub timezone: Option, + #[serde(default, rename = "maskedMembers")] + pub masked_members: Option>, } #[derive(Debug, Deserialize)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/mod.rs index 08d612591025e..e75103544ba2a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/mod.rs @@ -18,7 +18,7 @@ impl TestCompiler { pub fn new_with_timezone(evaluator: Rc, timezone: Tz) -> Self { let base_tools = Rc::new(MockBaseTools::default()); let security_context = Rc::new(MockSecurityContext); - let compiler = Compiler::new(evaluator, base_tools, security_context, timezone); + let compiler = Compiler::new(evaluator, base_tools, security_context, timezone, None); Self { compiler } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs index 523133718b9da..538aef0b5b126 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs @@ -18,34 +18,55 @@ use std::rc::Rc; /// Test context providing query tools and symbol creation helpers pub struct TestContext { + schema: MockSchema, query_tools: Rc, security_context: Rc, } impl TestContext { pub fn new(schema: MockSchema) -> Result { - Self::new_with_timezone(schema, Tz::UTC) + Self::new_with_options(schema, Tz::UTC, None, None, false) } + #[allow(dead_code)] pub fn new_with_timezone(schema: MockSchema, timezone: Tz) -> Result { - Self::new_with_options(schema, timezone, None) + Self::new_with_options(schema, timezone, None, None, false) } pub fn new_with_masked_members( schema: MockSchema, masked_members: Vec, ) -> Result { - Self::new_with_options(schema, Tz::UTC, Some(masked_members)) + Self::new_with_options(schema, Tz::UTC, Some(masked_members), None, false) + } + + fn for_options(&self, options: &dyn BaseQueryOptions) -> Result { + let static_data = options.static_data(); + let timezone = static_data + .timezone + .as_deref() + .and_then(|tz| tz.parse::().ok()) + .unwrap_or(Tz::UTC); + + Self::new_with_options( + self.schema.clone(), + timezone, + static_data.masked_members.clone(), + static_data.member_to_alias.clone(), + static_data.export_annotated_sql, + ) } fn new_with_options( schema: MockSchema, timezone: Tz, masked_members: Option>, + member_to_alias: Option>, + export_annotated_sql: bool, ) -> Result { let base_tools = schema.create_base_tools()?; let join_graph = Rc::new(schema.create_join_graph()?); - let evaluator = schema.create_evaluator(); + let evaluator = schema.clone().create_evaluator(); let security_context: Rc = Rc::new(MockSecurityContext); @@ -55,11 +76,13 @@ impl TestContext { Rc::new(base_tools), join_graph, Some(timezone.to_string()), - false, // export_annotated_sql + export_annotated_sql, masked_members, + member_to_alias, )?; Ok(Self { + schema, query_tools, security_context, }) @@ -280,6 +303,9 @@ impl TestContext { .unwrap_or(false), ) .pre_aggregation_id(yaml_options.pre_aggregation_id) + .member_to_alias(yaml_options.member_to_alias) + .masked_members(yaml_options.masked_members) + .timezone(yaml_options.timezone) .build(), ) } @@ -311,8 +337,9 @@ impl TestContext { query: &str, ) -> Result<(String, Vec>), cubenativeutils::CubeError> { let options = self.create_query_options_from_yaml(query); - let request = QueryProperties::try_new(self.query_tools.clone(), options.clone())?; - let planner = TopLevelPlanner::new(request, self.query_tools.clone(), false); + let ctx = self.for_options(options.as_ref())?; + let request = QueryProperties::try_new(ctx.query_tools.clone(), options)?; + let planner = TopLevelPlanner::new(request, ctx.query_tools.clone(), false); planner.plan() } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs index ce038685e2713..8ea65268a9481 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/common_sql_generation.rs @@ -2,6 +2,43 @@ use crate::test_fixtures::cube_bridge::MockSchema; use crate::test_fixtures::test_utils::TestContext; use indoc::indoc; +#[test] +fn test_member_to_alias() { + let schema = MockSchema::from_yaml_file("common/visitors.yaml"); + let test_context = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {r#" + measures: + - visitors.count + dimensions: + - visitors.source + time_dimensions: + - dimension: visitors.created_at + granularity: day + memberToAlias: + visitors.count: "custom_count" + visitors.source: "custom_source" + visitors.created_at: "custom_created_at" + "#}; + + let sql = test_context + .build_sql(query_yaml) + .expect("Should generate SQL with custom aliases"); + + assert!( + sql.contains("custom_count"), + "SQL should contain custom alias for measure, got: {sql}" + ); + assert!( + sql.contains("custom_source"), + "SQL should contain custom alias for dimension, got: {sql}" + ); + assert!( + sql.contains("custom_created_at"), + "SQL should contain custom alias for time dimension base, got: {sql}" + ); +} + #[test] fn test_simple_join_sql() { let schema = MockSchema::from_yaml_file("common/diamond_joins.yaml"); From 58c06879b93110336740fc7792144ca1165239df Mon Sep 17 00:00:00 2001 From: waralexrom <108349432+waralexrom@users.noreply.github.com> Date: Tue, 17 Mar 2026 18:13:21 +0100 Subject: [PATCH 5/7] fix(tesseract): Subquery dimension in filter (#10506) --- .../postgres/sub-query-dimensions.test.ts | 122 ++++++++++++++++++ .../planner/planners/simple_query_planer.rs | 4 +- .../common/subquery_dimensions.yaml | 48 +++++++ .../cubesqlplanner/src/tests/mod.rs | 1 + ...ons__subquery_dimension_in_projection.snap | 14 ++ ...ns__subquery_dimension_used_in_filter.snap | 13 ++ .../src/tests/subquery_dimensions.rs | 77 +++++++++++ 7 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/subquery_dimensions.yaml create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_in_projection.snap create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_used_in_filter.snap create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/subquery_dimensions.rs diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts index f022048271fe1..55b76da1adb91 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/sub-query-dimensions.test.ts @@ -148,3 +148,125 @@ cube(\`C\`, { c__important_value: '81.5', }])); }); + +describe('Sub Query Dimensions in Filters', () => { + jest.setTimeout(200000); + + const { compiler, joinGraph, cubeEvaluator } = prepareJsCompiler(` +cube(\`Sales\`, { + sql: \` + SELECT 1 AS id, 10 AS customer_id, 100 AS amount UNION ALL + SELECT 2 AS id, 10 AS customer_id, 200 AS amount UNION ALL + SELECT 3 AS id, 20 AS customer_id, 50 AS amount UNION ALL + SELECT 4 AS id, 30 AS customer_id, 75 AS amount + \`, + + joins: { + Customers: { + relationship: \`many_to_one\`, + sql: \`\${CUBE}.customer_id = \${Customers}.id\`, + }, + }, + + measures: { + totalAmount: { + sql: \`amount\`, + type: \`sum\`, + }, + }, + + dimensions: { + id: { + sql: \`id\`, + type: \`number\`, + primaryKey: true, + }, + }, +}); + +cube(\`Customers\`, { + sql: \` + SELECT 10 AS id UNION ALL + SELECT 20 AS id UNION ALL + SELECT 30 AS id + \`, + + joins: { + CustomerOrders: { + relationship: \`one_to_many\`, + sql: \`\${CUBE}.id = \${CustomerOrders}.customer_id\`, + }, + }, + + dimensions: { + id: { + sql: \`id\`, + type: \`number\`, + primaryKey: true, + }, + + totalSpend: { + sql: \`\${CustomerOrders.orderTotal}\`, + type: \`number\`, + subQuery: true, + }, + }, +}); + +cube(\`CustomerOrders\`, { + sql: \` + SELECT 1 AS id, 10 AS customer_id, 80 AS amount UNION ALL + SELECT 2 AS id, 10 AS customer_id, 70 AS amount UNION ALL + SELECT 3 AS id, 20 AS customer_id, 30 AS amount UNION ALL + SELECT 4 AS id, 30 AS customer_id, 200 AS amount + \`, + + dimensions: { + id: { + sql: \`id\`, + type: \`number\`, + primaryKey: true, + }, + + customerId: { + sql: \`\${CUBE}.customer_id\`, + type: \`number\`, + }, + }, + + measures: { + orderTotal: { + sql: \`amount\`, + type: \`sum\`, + }, + }, +}); + `); + + async function runQueryTest(q, expectedResult) { + await compiler.compile(); + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, q); + + console.log(query.buildSqlAndParams()); + + const res = await dbRunner.testQuery(query.buildSqlAndParams()); + console.log(JSON.stringify(res)); + + expect(res).toEqual( + expectedResult + ); + } + + it('subquery dimension used in filter', async () => runQueryTest({ + measures: ['Sales.totalAmount'], + filters: [ + { + member: 'Customers.totalSpend', + operator: 'gt', + values: ['100'], + }, + ], + }, [{ + sales__total_amount: '375', + }])); +}); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs index 4d481f706d784..6b1b7c73de78b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/planners/simple_query_planer.rs @@ -58,7 +58,9 @@ impl SimpleQueryPlanner { let join = self.query_properties.simple_query_join()?; let subquery_dimensions = if let Some(join) = &join { collect_sub_query_dimensions_from_symbols( - &self.query_properties.all_members(false), + &self + .query_properties + .get_member_symbols(true, true, true, true, &vec![]), &self.join_planner, &join, )? diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/subquery_dimensions.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/subquery_dimensions.yaml new file mode 100644 index 0000000000000..8ebb9ccc00f9f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/subquery_dimensions.yaml @@ -0,0 +1,48 @@ +cubes: + - name: Sales + sql: "SELECT * FROM sales" + joins: + - name: Customers + relationship: many_to_one + sql: "{CUBE}.customer_id = {Customers.id}" + dimensions: + - name: id + type: number + sql: id + primary_key: true + measures: + - name: totalAmount + type: sum + sql: amount + + - name: Customers + sql: "SELECT * FROM customers" + joins: + - name: CustomerOrders + relationship: one_to_many + sql: "{CUBE}.id = {CustomerOrders.customerId}" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: totalSpend + type: number + sql: "{CustomerOrders.orderTotal}" + sub_query: true + measures: [] + + - name: CustomerOrders + sql: "SELECT * FROM customer_orders" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: customerId + type: number + sql: "{CUBE}.customer_id" + measures: + - name: orderTotal + type: sum + sql: amount diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/mod.rs index fcb42c70992b7..ffb988e85dd0b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/mod.rs @@ -7,4 +7,5 @@ mod join_hints_collector; mod measure_symbol; mod member_expressions_on_views; mod pre_aggregation_sql_generation; +mod subquery_dimensions; mod utils; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_in_projection.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_in_projection.snap new file mode 100644 index 0000000000000..7ff74c3ee1248 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_in_projection.snap @@ -0,0 +1,14 @@ +--- +source: cubesqlplanner/src/tests/subquery_dimensions.rs +expression: sql +--- +SELECT "Customers_totalSpend_subquery"."total_spend" "customers__total_spend", sum("sales".amount) "sales__total_amount" +FROM sales AS "sales" +LEFT JOIN customers AS "customers" ON "sales".customer_id = "customers".id +LEFT JOIN (SELECT "customers".id "customers__id", sum("customer_orders".amount) "total_spend" +FROM customers AS "customers" +LEFT JOIN customer_orders AS "customer_orders" ON "customers".id = "customer_orders".customer_id +GROUP BY 1 +ORDER BY 2 DESC) AS "Customers_totalSpend_subquery" ON (("Customers_totalSpend_subquery"."customers__id" = "customers".id)) +GROUP BY 1 +ORDER BY 2 DESC diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_used_in_filter.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_used_in_filter.snap new file mode 100644 index 0000000000000..76645496fca28 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__subquery_dimensions__subquery_dimension_used_in_filter.snap @@ -0,0 +1,13 @@ +--- +source: cubesqlplanner/src/tests/subquery_dimensions.rs +expression: sql +--- +SELECT sum("sales".amount) "sales__total_amount" +FROM sales AS "sales" +LEFT JOIN customers AS "customers" ON "sales".customer_id = "customers".id +LEFT JOIN (SELECT "customers".id "customers__id", sum("customer_orders".amount) "total_spend" +FROM customers AS "customers" +LEFT JOIN customer_orders AS "customer_orders" ON "customers".id = "customer_orders".customer_id +GROUP BY 1 +ORDER BY 2 DESC) AS "Customers_totalSpend_subquery" ON (("Customers_totalSpend_subquery"."customers__id" = "customers".id)) +WHERE ("Customers_totalSpend_subquery"."total_spend" > $_0_$) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/subquery_dimensions.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/subquery_dimensions.rs new file mode 100644 index 0000000000000..7e36e4850a2b5 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/subquery_dimensions.rs @@ -0,0 +1,77 @@ +use crate::test_fixtures::cube_bridge::MockSchema; +use crate::test_fixtures::test_utils::TestContext; +use indoc::indoc; + +#[test] +fn test_subquery_dimension_used_in_filter() { + let schema = MockSchema::from_yaml_file("common/subquery_dimensions.yaml"); + let test_context = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {r#" + measures: + - Sales.totalAmount + filters: + - member: Customers.totalSpend + operator: gt + values: + - "100" + "#}; + + let sql = test_context + .build_sql(query_yaml) + .expect("Should generate SQL for subquery dimension used in filter"); + + // Subquery dimension in filter must be resolved as a correlated subquery, + // not inlined as a raw aggregate expression + assert!( + sql.contains("SELECT"), + "Filter on subquery dimension should produce a subquery in WHERE, got: {sql}" + ); + let subquery_count = sql.matches("SELECT").count(); + assert!( + subquery_count >= 2, + "Expected at least 2 SELECTs (main + subquery), got {subquery_count} in: {sql}" + ); + + // Raw aggregate must not leak into WHERE — it belongs inside the subquery + let where_pos = sql.find("WHERE").expect("SQL should have WHERE clause"); + let where_clause = &sql[where_pos..]; + assert!( + !where_clause.contains("sum(\"customer_orders\""), + "WHERE clause must not contain raw aggregate from CustomerOrders, got: {where_clause}" + ); + + insta::assert_snapshot!(sql); +} + +#[test] +fn test_subquery_dimension_in_projection() { + let schema = MockSchema::from_yaml_file("common/subquery_dimensions.yaml"); + let test_context = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + measures: + - Sales.totalAmount + dimensions: + - Customers.totalSpend + "}; + + let sql = test_context + .build_sql(query_yaml) + .expect("Should generate SQL for subquery dimension in projection"); + + // Subquery dimension in SELECT must be resolved as a correlated subquery + let subquery_count = sql.matches("SELECT").count(); + assert!( + subquery_count >= 2, + "Expected at least 2 SELECTs (main + subquery), got {subquery_count} in: {sql}" + ); + + // Raw aggregate from CustomerOrders must not appear directly in the outer SELECT + assert!( + !sql.starts_with("SELECT sum(\"customer_orders\""), + "Outer SELECT must not contain raw aggregate from CustomerOrders, got: {sql}" + ); + + insta::assert_snapshot!(sql); +} From 6a0bd80bd57a1aa7a04d0fdf1dea3f3dfa9055ef Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 17 Mar 2026 18:46:17 +0100 Subject: [PATCH 6/7] chore(cubestore): Bump lz4_flex from 0.11.3 to 0.11.6 (#10501) Bumps [lz4_flex](https://github.com/pseitz/lz4_flex) from 0.11.3 to 0.11.6. - [Release notes](https://github.com/pseitz/lz4_flex/releases) - [Changelog](https://github.com/PSeitz/lz4_flex/blob/main/CHANGELOG.md) - [Commits](https://github.com/pseitz/lz4_flex/commits/0.11.6) --- updated-dependencies: - dependency-name: lz4_flex dependency-version: 0.11.6 dependency-type: indirect ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- rust/cubestore/Cargo.lock | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index a3a99030985b6..57a86b8f847f2 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -1491,7 +1491,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "twox-hash", + "twox-hash 1.6.3", ] [[package]] @@ -3700,11 +3700,11 @@ dependencies = [ [[package]] name = "lz4_flex" -version = "0.11.3" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" +checksum = "373f5eceeeab7925e0c1098212f2fbc4d416adec9d35051a6ab251e824c1854a" dependencies = [ - "twox-hash", + "twox-hash 2.1.2", ] [[package]] @@ -4017,7 +4017,7 @@ dependencies = [ "sha1 0.6.0", "sha2 0.8.2", "time 0.2.7", - "twox-hash", + "twox-hash 1.6.3", "uuid 0.8.2", ] @@ -4574,7 +4574,7 @@ dependencies = [ "snap", "thrift 0.17.0", "tokio", - "twox-hash", + "twox-hash 1.6.3", "zstd", "zstd-sys", ] @@ -6734,11 +6734,17 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 1.0.0", - "rand 0.7.3", + "cfg-if 0.1.10", + "rand 0.6.5", "static_assertions", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typenum" version = "1.17.0" From 66ccdf6101dc95cdd4bfc5b256095fecca1194aa Mon Sep 17 00:00:00 2001 From: Artyom Keydunov Date: Tue, 17 Mar 2026 11:30:46 -0700 Subject: [PATCH 7/7] docs: remove outdated notices, video, and Semantic Layer Sync mention (#10512) - Remove DAX API preview warning (no longer in preview) - Remove Power BI info box from Semantic Layer Sync page - Remove Loom video and Semantic Layer Sync reference from Metabase page Made-with: Cursor --- .../apis-integrations/core-data-apis/dax-api/index.mdx | 6 ------ .../apis-integrations/semantic-layer-sync/index.mdx | 9 --------- .../configuration/visualization-tools/metabase.mdx | 8 +------- 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx b/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx index e8f0197872af6..4a965ac625bec 100644 --- a/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx +++ b/docs/content/product/apis-integrations/core-data-apis/dax-api/index.mdx @@ -18,12 +18,6 @@ Also requires the M [deployment tier](/product/administration/pricing#deployment - - -The DAX API is currently in preview. Please [contact us](https://cube.dev/contact) to enable it for your account. - - - Read below about the DAX API [configuration](#configuration), [authentication](#authentication), and [using it](#using-dax-api-with-power-bi) with Power BI. diff --git a/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx b/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx index 89a58480552f2..06874010d6cad 100644 --- a/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx +++ b/docs/content/product/apis-integrations/semantic-layer-sync/index.mdx @@ -50,13 +50,6 @@ on configuration and features for specific tools: /> - - -Previously, Semantic Layer Sync provided a way to connect to [Microsoft Power BI][ref-powerbi]. -Currently, it's recommended to use the [DAX API][ref-dax-api]. - - - ## Creating syncs You can create a new sync by navigating to IDE → Integrations @@ -381,8 +374,6 @@ on, i.e., your development mode branch, shared branch, or main branch. [ref-data-model]: /product/data-modeling/overview [ref-sql-api]: /product/apis-integrations/sql-api -[ref-powerbi]: /product/configuration/visualization-tools/powerbi -[ref-dax-api]: /product/apis-integrations/dax-api [ref-config-file]: /product/configuration#configuration-options [ref-config-contexts]: /product/configuration/reference/config#scheduledrefreshcontexts [ref-config-schemaversion]: /product/configuration/reference/config#schema_version diff --git a/docs/content/product/configuration/visualization-tools/metabase.mdx b/docs/content/product/configuration/visualization-tools/metabase.mdx index b340ab8df6163..ba3783416aebe 100644 --- a/docs/content/product/configuration/visualization-tools/metabase.mdx +++ b/docs/content/product/configuration/visualization-tools/metabase.mdx @@ -10,12 +10,9 @@ Webinar recording: [Instant Insights Unlocked with Cube & Metabase](https://cube - - ## Connect from Cube Cloud -It is recommended to use [Semantic Layer Sync][ref-sls] to connect Cube Cloud to -Metabase. It automatically synchronizes the [data model][ref-data-model] with Metabase. +You can connect Cube Cloud to Metabase using the [SQL API][ref-sql-api]. Navigate to the [Integrations](/product/workspace/integrations#connect-specific-tools) page, click Connect to Cube, and choose Metabase to get @@ -82,6 +79,3 @@ well. [ref-sql-api]: /product/apis-integrations/sql-api [metabase-oss]: https://github.com/metabase/metabase [metabase]: https://www.metabase.com -[ref-sls]: /product/apis-integrations/semantic-layer-sync -[ref-sql-api]: /product/apis-integrations/sql-api -[ref-data-model]: /product/data-modeling/overview