diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index c926861a8f91e..e13ac688b7290 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -451,7 +451,7 @@ class ApiGateway { try { await this.assertApiScope('data', req.context?.securityContext); - await this.sqlServer.execSql(req.body.query, res, req.context?.securityContext, req.body.cache, req.body.timezone); + await this.sqlServer.execSql(req.body.query, res, req.context?.securityContext, req.body.cache, req.body.timezone, req.body.throwContinueWait); } catch (e: any) { // Quickfix for https://github.com/cube-js/cube/issues/10450, // Right now, it's too complicated to fix the issue correctly, because diff --git a/packages/cubejs-api-gateway/src/sql-server.ts b/packages/cubejs-api-gateway/src/sql-server.ts index 1a7b0beec8016..b121530dc22b7 100644 --- a/packages/cubejs-api-gateway/src/sql-server.ts +++ b/packages/cubejs-api-gateway/src/sql-server.ts @@ -74,8 +74,8 @@ export class SQLServer { return this.sqlInterfaceInstance; } - public async execSql(sqlQuery: string, stream: any, securityContext?: any, cacheMode?: CacheMode, timezone?: string) { - await execSql(this.getSqlInterfaceInstance(), sqlQuery, stream, securityContext, cacheMode, timezone); + public async execSql(sqlQuery: string, stream: any, securityContext?: any, cacheMode?: CacheMode, timezone?: string, throwContinueWait?: boolean) { + await execSql(this.getSqlInterfaceInstance(), sqlQuery, stream, securityContext, cacheMode, timezone, throwContinueWait); } public async sql4sql(sqlQuery: string, disablePostProcessing: boolean, securityContext?: unknown): Promise { diff --git a/packages/cubejs-api-gateway/test/index.test.ts b/packages/cubejs-api-gateway/test/index.test.ts index d5dbf9d6e2601..245daf2ae1830 100644 --- a/packages/cubejs-api-gateway/test/index.test.ts +++ b/packages/cubejs-api-gateway/test/index.test.ts @@ -1222,7 +1222,8 @@ describe('API Gateway', () => { expect.anything(), {}, undefined, - undefined + undefined, + undefined, ); }); @@ -1261,7 +1262,44 @@ describe('API Gateway', () => { expect.anything(), {}, 'stale-while-revalidate', - 'America/Los_Angeles' + 'America/Los_Angeles', + undefined, + ); + }); + + test('throwContinueWait can be passed', async () => { + const { app, apiGateway } = await createApiGateway(); + + // Mock the sqlServer.execSql method + const execSqlMock = jest.fn(async (query, stream, securityContext, cacheMode, timezone) => { + // Simulate writing error to the stream + stream.write(`${JSON.stringify({ + error: "Continue wait" + })}\n`); + stream.end(); + }); + + apiGateway.getSQLServer().execSql = execSqlMock; + + await request(app) + .post('/cubejs-api/v1/cubesql') + .set('Content-type', 'application/json') + .set('Authorization', 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.e30.t-IDcSemACt8x4iTMCda8Yhe3iZaWbvV5XKSTbuAn0M') + .send({ + query: 'SELECT id FROM test LIMIT 3', + throwContinueWait: true, + }) + .responseType('text') + .expect(200); + + // Verify the mock was called with correct parameters + expect(execSqlMock).toHaveBeenCalledWith( + 'SELECT id FROM test LIMIT 3', + expect.anything(), + {}, + undefined, + undefined, + true, ); }); }); diff --git a/packages/cubejs-backend-native/Cargo.toml b/packages/cubejs-backend-native/Cargo.toml index 69a3b10e672fb..f8e642114eb6e 100644 --- a/packages/cubejs-backend-native/Cargo.toml +++ b/packages/cubejs-backend-native/Cargo.toml @@ -57,3 +57,6 @@ neon-debug = [] neon-entrypoint = [] python = ["pyo3", "pyo3-asyncio"] async-log = ["log_nonblock"] + +[lints.clippy] +too_many_arguments = "allow" diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index b0f74b9be87be..d085f65ef3e25 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -438,10 +438,10 @@ export const shutdownInterface = async (instance: SqlInterfaceInstance, shutdown await native.shutdownInterface(instance, shutdownMode); }; -export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any, cacheMode: CacheMode = 'stale-if-slow', timezone?: string): Promise => { +export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any, cacheMode: CacheMode = 'stale-if-slow', timezone?: string, throwContinueWait?: boolean): Promise => { const native = loadNative(); - await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null, cacheMode, timezone); + await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null, cacheMode, timezone, throwContinueWait); }; // TODO parse result from native code diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 63560c625640b..8c4e6885fc7b7 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -227,6 +227,7 @@ async fn handle_sql_query( sql_query: &str, cache_mode: &str, timezone: Option, + throw_continue_wait: bool, ) -> Result<(), CubeError> { let span_id = Some(Arc::new(SpanId::new( Uuid::new_v4().to_string(), @@ -276,6 +277,15 @@ async fn handle_sql_query( *cm = Some(cache_enum); } + { + let mut cm = session + .state + .throw_continue_wait + .write() + .expect("failed to unlock session throw_continue_wait for change"); + *cm = throw_continue_wait; + } + let session_clone = Arc::clone(&session); let span_id_clone = span_id.clone(); @@ -471,6 +481,20 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult { Err(_) => None, }; + let throw_continue_wait: bool = match cx.argument::(6) { + Ok(val) => { + if val.is_a::(&mut cx) || val.is_a::(&mut cx) { + false + } else { + match val.downcast::(&mut cx) { + Ok(v) => v.value(&mut cx), + Err(_) => false, + } + } + } + Err(_) => false, + }; + let js_stream_on_fn = Arc::new( node_stream .get::(&mut cx, "on")? @@ -520,6 +544,7 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult { &sql_query, &cache_mode, timezone, + throw_continue_wait, ) .await; diff --git a/packages/cubejs-backend-native/src/transport.rs b/packages/cubejs-backend-native/src/transport.rs index d53f743b2c965..9db2d4407be89 100644 --- a/packages/cubejs-backend-native/src/transport.rs +++ b/packages/cubejs-backend-native/src/transport.rs @@ -345,6 +345,7 @@ impl TransportService for NodeBridgeTransport { schema: SchemaRef, member_fields: Vec, cache_mode: Option, + throw_continue_wait: bool, ) -> Result, CubeError> { trace!("[transport] Request ->"); @@ -451,6 +452,9 @@ impl TransportService for NodeBridgeTransport { if let Err(e) = &result { if e.message.to_lowercase().contains("continue wait") { + if throw_continue_wait { + return Err(CubeError::internal("Continue wait".to_string())); + } continue; } } @@ -471,15 +475,24 @@ impl TransportService for NodeBridgeTransport { match error_value { serde_json::Value::String(error) => { if error.to_lowercase() == *"continue wait" { - debug!( - "[transport] load - retrying request (continue wait) requestId: {}", - request_id - ); + if throw_continue_wait { + debug!( + "[transport] load - throwing continue wait, requestId: {}", + request_id + ); + return Err(CubeError::internal( + "Continue wait".to_string(), + )); + } + debug!( + "[transport] load - retrying request (continue wait) requestId: {}", + request_id + ); continue; - } else { - return Err(CubeError::user(error.clone())); } + + return Err(CubeError::user(error.clone())); } other => { error!( @@ -538,6 +551,7 @@ impl TransportService for NodeBridgeTransport { meta: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + throw_continue_wait: bool, ) -> Result { trace!("[transport] Request ->"); @@ -585,6 +599,9 @@ impl TransportService for NodeBridgeTransport { if let Err(e) = &res { if e.message.to_lowercase().contains("continue wait") { + if throw_continue_wait { + return Err(CubeError::internal("Continue wait".to_string())); + } continue; } } diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index bc3dfecbca31a..50aeb49935dfd 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -111,6 +111,7 @@ pub struct CubeScanOptions { pub change_user: Option, pub max_records: Option, pub cache_mode: Option, + pub throw_continue_wait: bool, } #[derive(Debug, Clone)] @@ -481,6 +482,7 @@ impl ExecutionPlan for CubeScanExecutionPlan { meta, self.schema.clone(), self.member_fields.clone(), + self.options.throw_continue_wait, ) .await; let stream = result.map_err(|err| DataFusionError::External(Box::new(err)))?; @@ -721,6 +723,7 @@ async fn load_data( schema, member_fields, options.cache_mode, + options.throw_continue_wait, ) .await .map_err(|mut err| { @@ -1260,6 +1263,7 @@ mod tests { schema: SchemaRef, member_fields: Vec, _cache_mode: Option, + _throw_continue_wait: bool, ) -> Result, CubeError> { let response = r#" { @@ -1295,6 +1299,7 @@ mod tests { _meta_fields: LoadRequestMeta, _schema: SchemaRef, _member_fields: Vec, + _throw_continue_wait: bool, ) -> Result { panic!("It's a fake transport"); } @@ -1386,6 +1391,7 @@ mod tests { change_user: None, max_records: None, cache_mode: None, + throw_continue_wait: false, }, transport: get_test_transport(), meta: get_test_load_meta(DatabaseProtocol::PostgreSQL), diff --git a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs index 9207df9f950e0..e73fffdf386a4 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs @@ -2081,6 +2081,13 @@ impl LanguageToLogicalPlanConverter { .read() .expect("failed to read lock for session cache_mode"); + let throw_continue_wait = *self + .cube_context + .session_state + .throw_continue_wait + .read() + .expect("failed to read lock for session throw_continue_wait"); + let node = Arc::new(CubeScanNode::new( Arc::new(DFSchema::new_with_metadata( fields.into_iter().map(|(f, _)| f).collect(), @@ -2093,6 +2100,7 @@ impl LanguageToLogicalPlanConverter { change_user, max_records, cache_mode: cache_mode.clone(), + throw_continue_wait, }, alias_to_cube.into_iter().map(|(_, c)| c).unique().collect(), self.span_id.clone(), diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 17ab746633ef5..43a3be7c44f5f 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -916,6 +916,7 @@ impl TransportService for TestConnectionTransport { schema: SchemaRef, member_fields: Vec, _cache_mode: Option, + _throw_continue_wait: bool, ) -> Result, CubeError> { { let mut calls = self.load_calls.lock().await; @@ -958,6 +959,7 @@ impl TransportService for TestConnectionTransport { _meta_fields: LoadRequestMeta, _schema: SchemaRef, _member_fields: Vec, + _throw_continue_wait: bool, ) -> Result { panic!("It's a fake transport"); } diff --git a/rust/cubesql/cubesql/src/sql/session.rs b/rust/cubesql/cubesql/src/sql/session.rs index a1e3b6589b8a2..610d4c77c5f19 100644 --- a/rust/cubesql/cubesql/src/sql/session.rs +++ b/rust/cubesql/cubesql/src/sql/session.rs @@ -94,6 +94,8 @@ pub struct SessionState { pub cache_mode: RwLockSync>, pub query_timezone: RwLockSync>, + + pub throw_continue_wait: RwLockSync, } impl SessionState { @@ -127,6 +129,7 @@ impl SessionState { auth_context_expiration, cache_mode: RwLockSync::new(None), query_timezone: RwLockSync::new(None), + throw_continue_wait: RwLockSync::new(false), } } diff --git a/rust/cubesql/cubesql/src/transport/service.rs b/rust/cubesql/cubesql/src/transport/service.rs index 0b16fa10b7576..2ebda4676d736 100644 --- a/rust/cubesql/cubesql/src/transport/service.rs +++ b/rust/cubesql/cubesql/src/transport/service.rs @@ -146,6 +146,7 @@ pub trait TransportService: Send + Sync + Debug { schema: SchemaRef, member_fields: Vec, cache_mode: Option, + throw_continue_wait: bool, ) -> Result, CubeError>; async fn load_stream( @@ -157,6 +158,7 @@ pub trait TransportService: Send + Sync + Debug { meta_fields: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + throw_continue_wait: bool, ) -> Result; async fn can_switch_user_for_session( @@ -287,6 +289,7 @@ impl TransportService for HttpTransport { schema: SchemaRef, member_fields: Vec, cache_mode: Option, + _throw_continue_wait: bool, ) -> Result, CubeError> { if meta.change_user().is_some() { return Err(CubeError::internal( @@ -328,6 +331,7 @@ impl TransportService for HttpTransport { _meta_fields: LoadRequestMeta, _schema: SchemaRef, _member_fields: Vec, + _throw_continue_wait: bool, ) -> Result { panic!("Does not work for standalone mode yet"); }