Skip to content

Commit b5a0ec3

Browse files
authored
feat(cubestore): Queue - add exclusivity support (cube-js#10479)
QUEUE ADD command now accepts an EXCLUSIVE flag, and QUEUE RETRIEVE passes the caller's process_id for exclusivity checks.
1 parent 4b8ac6f commit b5a0ec3

8 files changed

Lines changed: 316 additions & 46 deletions

File tree

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ async fn do_insert(
6161
value: "a".repeat(size_kb * 1024), // size in bytes
6262
priority: 0,
6363
orphaned: None,
64+
process_id: None,
65+
exclusive: false,
6466
});
6567

6668
let res = fut.await;

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,8 @@ pub struct QueueAddPayload {
681681
pub value: String,
682682
pub priority: i64,
683683
pub orphaned: Option<u32>,
684+
pub process_id: Option<String>,
685+
pub exclusive: bool,
684686
}
685687

686688
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq)]
@@ -831,6 +833,7 @@ pub trait CacheStore: DIService + Send + Sync {
831833
&self,
832834
path: String,
833835
allow_concurrency: u32,
836+
caller_process_id: Option<String>,
834837
) -> Result<QueueRetrieveResponse, CubeError>;
835838
async fn queue_ack(&self, key: QueueKey, result: Option<String>) -> Result<bool, CubeError>;
836839
async fn queue_result_by_path(
@@ -1103,6 +1106,8 @@ impl CacheStore for RocksCacheStore {
11031106
QueueItem::status_default(),
11041107
payload.priority,
11051108
payload.orphaned.clone(),
1109+
payload.process_id,
1110+
payload.exclusive,
11061111
),
11071112
batch_pipe,
11081113
)?;
@@ -1300,6 +1305,7 @@ impl CacheStore for RocksCacheStore {
13001305
&self,
13011306
path: String,
13021307
allow_concurrency: u32,
1308+
caller_process_id: Option<String>,
13031309
) -> Result<QueueRetrieveResponse, CubeError> {
13041310
self.write_operation_queue("queue_retrieve_by_path", move |db_ref, batch_pipe| {
13051311
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
@@ -1334,9 +1340,33 @@ impl CacheStore for RocksCacheStore {
13341340
};
13351341

13361342
if id_row.get_row().get_status() == &QueueItemStatus::Pending {
1343+
if id_row.get_row().get_exclusive() {
1344+
match (id_row.get_row().get_process_id(), &caller_process_id) {
1345+
(Some(_), None) => return Err(CubeError::user(
1346+
"QUEUE RETRIEVE requires a process_id in the connection context (x-process-id header)".to_string(),
1347+
)),
1348+
(None, Some(_)) => {
1349+
log::warn!("Incorrect queue_item with exclusive flag, empty process_id, id: {:?}", caller_process_id);
1350+
1351+
return Ok(QueueRetrieveResponse::NotFound { pending, active })
1352+
}
1353+
(Some(item_process_id), Some(caller_id)) => if item_process_id == caller_id {
1354+
// OK, caller matches the exclusive item owner
1355+
} else {
1356+
return Ok(QueueRetrieveResponse::ExclusiveAccessFailed {
1357+
pending,
1358+
active,
1359+
})
1360+
},
1361+
(None, None) => {
1362+
// No process_id on item and no caller — allow retrieval
1363+
}
1364+
}
1365+
}
1366+
13371367
let mut new = id_row.get_row().clone();
13381368
new.status = QueueItemStatus::Active;
1339-
// It's important to insert heartbeat, because
1369+
// It's important to insert heartbeat, because
13401370
// without that created datetime will be used for orphaned filtering
13411371
new.update_heartbeat();
13421372

@@ -1652,6 +1682,7 @@ impl CacheStore for ClusterCacheStoreClient {
16521682
&self,
16531683
_path: String,
16541684
_allow_concurrency: u32,
1685+
_caller_process_id: Option<String>,
16551686
) -> Result<QueueRetrieveResponse, CubeError> {
16561687
panic!("CacheStore cannot be used on the worker node! queue_retrieve_by_path was used.")
16571688
}
@@ -2074,19 +2105,47 @@ mod tests {
20742105
let now = Utc::now();
20752106
let item_pending_custom_orphaned = IdRow::new(
20762107
1,
2077-
QueueItem::new("1".to_string(), QueueItemStatus::Pending, 1, Some(10)),
2108+
QueueItem::new(
2109+
"1".to_string(),
2110+
QueueItemStatus::Pending,
2111+
1,
2112+
Some(10),
2113+
None,
2114+
false,
2115+
),
20782116
);
20792117
let item_pending_custom_orphaned_expired = IdRow::new(
20802118
2,
2081-
QueueItem::new("2".to_string(), QueueItemStatus::Pending, 1, Some(1)),
2119+
QueueItem::new(
2120+
"2".to_string(),
2121+
QueueItemStatus::Pending,
2122+
1,
2123+
Some(1),
2124+
None,
2125+
false,
2126+
),
20822127
);
20832128
let item_active_custom_orphaned = IdRow::new(
20842129
3,
2085-
QueueItem::new("3".to_string(), QueueItemStatus::Active, 1, Some(10)),
2130+
QueueItem::new(
2131+
"3".to_string(),
2132+
QueueItemStatus::Active,
2133+
1,
2134+
Some(10),
2135+
None,
2136+
false,
2137+
),
20862138
);
20872139
let mut item_active_custom_orphaned_expired = IdRow::new(
20882140
4,
2089-
QueueItem::new("4".to_string(), QueueItemStatus::Active, 1, Some(1)),
2141+
QueueItem::new(
2142+
"4".to_string(),
2143+
QueueItemStatus::Active,
2144+
1,
2145+
Some(1),
2146+
None,
2147+
false,
2148+
),
20902149
);
20912150

20922151
assert_eq!(

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,10 +272,11 @@ impl CacheStore for LazyRocksCacheStore {
272272
&self,
273273
path: String,
274274
allow_concurrency: u32,
275+
caller_process_id: Option<String>,
275276
) -> Result<QueueRetrieveResponse, CubeError> {
276277
self.init()
277278
.await?
278-
.queue_retrieve_by_path(path, allow_concurrency)
279+
.queue_retrieve_by_path(path, allow_concurrency, caller_process_id)
279280
.await
280281
}
281282

rust/cubestore/cubestore/src/cachestore/queue_item.rs

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ pub struct QueueItem {
8686
orphaned: Option<DateTime<Utc>>,
8787
#[serde(with = "ts_seconds")]
8888
expire: DateTime<Utc>,
89+
#[serde(default)]
90+
pub(crate) process_id: Option<String>,
91+
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
92+
pub(crate) exclusive: bool,
8993
}
9094

9195
impl RocksEntity for QueueItem {
@@ -134,6 +138,8 @@ impl QueueItem {
134138
status: QueueItemStatus,
135139
priority: i64,
136140
orphaned: Option<u32>,
141+
process_id: Option<String>,
142+
exclusive: bool,
137143
) -> Self {
138144
let (prefix, key) = QueueItem::parse_path(path);
139145
let created = Utc::now();
@@ -156,6 +162,8 @@ impl QueueItem {
156162
created.clone() + Duration::hours(4)
157163
},
158164
created,
165+
process_id,
166+
exclusive,
159167
}
160168
}
161169

@@ -213,6 +221,14 @@ impl QueueItem {
213221
&self.expire
214222
}
215223

224+
pub fn get_process_id(&self) -> &Option<String> {
225+
&self.process_id
226+
}
227+
228+
pub fn get_exclusive(&self) -> bool {
229+
self.exclusive
230+
}
231+
216232
pub fn status_default() -> QueueItemStatus {
217233
QueueItemStatus::Pending
218234
}
@@ -260,6 +276,10 @@ pub enum QueueRetrieveResponse {
260276
pending: u64,
261277
active: Vec<String>,
262278
},
279+
ExclusiveAccessFailed {
280+
pending: u64,
281+
active: Vec<String>,
282+
},
263283
}
264284

265285
impl QueueRetrieveResponse {
@@ -288,7 +308,8 @@ impl QueueRetrieveResponse {
288308
])],
289309
QueueRetrieveResponse::LockFailed { pending, active }
290310
| QueueRetrieveResponse::NotEnoughConcurrency { pending, active }
291-
| QueueRetrieveResponse::NotFound { pending, active } => {
311+
| QueueRetrieveResponse::NotFound { pending, active }
312+
| QueueRetrieveResponse::ExclusiveAccessFailed { pending, active } => {
292313
if extended {
293314
vec![Row::new(vec![
294315
TableValue::Null,
@@ -441,12 +462,54 @@ mod tests {
441462

442463
#[test]
443464
fn test_queue_item_sort() -> Result<(), CubeError> {
444-
let priority0_1 = QueueItem::new("1".to_string(), QueueItemStatus::Active, 0, None);
445-
let mut priority0_2 = QueueItem::new("2".to_string(), QueueItemStatus::Active, 0, None);
446-
let mut priority0_3 = QueueItem::new("3".to_string(), QueueItemStatus::Active, 0, None);
447-
let mut priority10_4 = QueueItem::new("4".to_string(), QueueItemStatus::Active, 10, None);
448-
let mut priority0_5 = QueueItem::new("5".to_string(), QueueItemStatus::Active, 0, None);
449-
let mut priority_n5_6 = QueueItem::new("6".to_string(), QueueItemStatus::Active, -5, None);
465+
let priority0_1 = QueueItem::new(
466+
"1".to_string(),
467+
QueueItemStatus::Active,
468+
0,
469+
None,
470+
None,
471+
false,
472+
);
473+
let mut priority0_2 = QueueItem::new(
474+
"2".to_string(),
475+
QueueItemStatus::Active,
476+
0,
477+
None,
478+
None,
479+
false,
480+
);
481+
let mut priority0_3 = QueueItem::new(
482+
"3".to_string(),
483+
QueueItemStatus::Active,
484+
0,
485+
None,
486+
None,
487+
false,
488+
);
489+
let mut priority10_4 = QueueItem::new(
490+
"4".to_string(),
491+
QueueItemStatus::Active,
492+
10,
493+
None,
494+
None,
495+
false,
496+
);
497+
let mut priority0_5 = QueueItem::new(
498+
"5".to_string(),
499+
QueueItemStatus::Active,
500+
0,
501+
None,
502+
None,
503+
false,
504+
);
505+
let mut priority_n5_6 = QueueItem::new(
506+
"6".to_string(),
507+
QueueItemStatus::Active,
508+
-5,
509+
None,
510+
None,
511+
false,
512+
);
450513

451514
// Force timestamps to be distinct (on systems that are too fast or have low clock resolution)
452515
for (i, item) in (1..).zip([

rust/cubestore/cubestore/src/queryplanner/info_schema/system_queue.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ use crate::queryplanner::info_schema::timestamp_nanos_or_panic;
33
use crate::queryplanner::{InfoSchemaTableDef, InfoSchemaTableDefContext};
44
use crate::CubeError;
55
use async_trait::async_trait;
6-
use datafusion::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampNanosecondArray};
6+
use datafusion::arrow::array::{
7+
ArrayRef, BooleanArray, Int64Array, StringArray, TimestampNanosecondArray,
8+
};
79
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
810
use std::sync::Arc;
911

@@ -44,6 +46,8 @@ impl InfoSchemaTableDef for SystemQueueTableDef {
4446
),
4547
Field::new("value", DataType::Utf8, false),
4648
Field::new("extra", DataType::Utf8, true),
49+
Field::new("process_id", DataType::Utf8, true),
50+
Field::new("exclusive", DataType::Boolean, false),
4751
]
4852
}
4953

@@ -116,6 +120,20 @@ impl InfoSchemaTableDef for SystemQueueTableDef {
116120
.map(|row| row.item.get_row().get_extra().clone()),
117121
))
118122
}),
123+
Box::new(|items| {
124+
Arc::new(StringArray::from_iter(
125+
items
126+
.iter()
127+
.map(|row| row.item.get_row().get_process_id().clone()),
128+
))
129+
}),
130+
Box::new(|items| {
131+
Arc::new(BooleanArray::from_iter(
132+
items
133+
.iter()
134+
.map(|row| Some(row.item.get_row().get_exclusive())),
135+
))
136+
}),
119137
]
120138
}
121139
}

rust/cubestore/cubestore/src/queryplanner/test_utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ impl CacheStore for CacheStoreMock {
848848
&self,
849849
_path: String,
850850
_allow_concurrency: u32,
851+
_caller_process_id: Option<String>,
851852
) -> Result<QueueRetrieveResponse, CubeError> {
852853
panic!("CacheStore mock!")
853854
}

rust/cubestore/cubestore/src/sql/cachestore.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ impl CacheStoreSqlService {
305305

306306
pub async fn exec_queue_command_with_context(
307307
&self,
308-
_context: SqlQueryContext,
308+
context: SqlQueryContext,
309309
command: QueueCommand,
310310
) -> Result<Arc<DataFrame>, CubeError> {
311311
let command_tag = command.as_tag_command();
@@ -323,10 +323,17 @@ impl CacheStoreSqlService {
323323
let (result, additional_traffic, track_time) = match command {
324324
QueueCommand::Add {
325325
key,
326+
exclusive,
326327
priority,
327328
orphaned,
328329
value,
329330
} => {
331+
if exclusive && context.process_id.is_none() {
332+
return Err(CubeError::user(
333+
"QUEUE ADD EXCLUSIVE requires a process_id in the connection context (x-process-id header)".to_string(),
334+
));
335+
}
336+
330337
let value_size = key.value.deep_size_of() + value.deep_size_of();
331338
let response = self
332339
.cachestore
@@ -335,6 +342,8 @@ impl CacheStoreSqlService {
335342
value,
336343
priority,
337344
orphaned,
345+
process_id: context.process_id.clone(),
346+
exclusive,
338347
})
339348
.await?;
340349

@@ -491,7 +500,7 @@ impl CacheStoreSqlService {
491500
} => {
492501
let result = self
493502
.cachestore
494-
.queue_retrieve_by_path(key.value, concurrency)
503+
.queue_retrieve_by_path(key.value, concurrency, context.process_id.clone())
495504
.await?;
496505

497506
(

0 commit comments

Comments
 (0)