Skip to content

Commit f802791

Browse files
committed
feat: support batch lookup
1 parent 9d3917c commit f802791

12 files changed

Lines changed: 1225 additions & 65 deletions

File tree

crates/fluss/src/client/connection.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::client::WriterClient;
1919
use crate::client::admin::FlussAdmin;
20+
use crate::client::lookup::LookupClient;
2021
use crate::client::metadata::Metadata;
2122
use crate::client::table::FlussTable;
2223
use crate::config::Config;
@@ -32,6 +33,7 @@ pub struct FlussConnection {
3233
network_connects: Arc<RpcClient>,
3334
args: Config,
3435
writer_client: RwLock<Option<Arc<WriterClient>>>,
36+
lookup_client: RwLock<Option<Arc<LookupClient>>>,
3537
}
3638

3739
impl FlussConnection {
@@ -48,6 +50,7 @@ impl FlussConnection {
4850
network_connects: connections.clone(),
4951
args: arg.clone(),
5052
writer_client: Default::default(),
53+
lookup_client: Default::default(),
5154
})
5255
}
5356

@@ -78,6 +81,18 @@ impl FlussConnection {
7881
Ok(client)
7982
}
8083

84+
/// Gets or creates a lookup client for batched lookup operations.
85+
pub fn get_or_create_lookup_client(&self) -> Result<Arc<LookupClient>> {
86+
if let Some(client) = self.lookup_client.read().as_ref() {
87+
return Ok(client.clone());
88+
}
89+
90+
// If not exists, create new one
91+
let client = Arc::new(LookupClient::new(&self.args, self.metadata.clone()));
92+
*self.lookup_client.write() = Some(client.clone());
93+
Ok(client)
94+
}
95+
8196
pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
8297
self.metadata.update_table_metadata(table_path).await?;
8398
let table_info = self.metadata.get_cluster().get_table(table_path).clone();
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Lookup client that batches multiple lookups together for improved throughput.
19+
//!
20+
//! This client achieves parity with the Java client by:
21+
//! - Queuing lookup operations instead of sending them immediately
22+
//! - Batching multiple lookups to the same server/bucket
23+
//! - Running a background sender task to process batches
24+
25+
use super::{LookupQuery, LookupQueue};
26+
use crate::client::lookup::lookup_sender::LookupSender;
27+
use crate::client::metadata::Metadata;
28+
use crate::config::Config;
29+
use crate::error::{Error, Result};
30+
use crate::metadata::{TableBucket, TablePath};
31+
use log::{debug, error};
32+
use std::sync::Arc;
33+
use std::time::Duration;
34+
use tokio::sync::mpsc;
35+
use tokio::task::JoinHandle;
36+
37+
/// A client that lookups values from the server with batching support.
38+
///
39+
/// The lookup client uses a queue and background sender to batch multiple
40+
/// lookup operations together, reducing network round trips and improving
41+
/// throughput.
42+
///
43+
/// # Example
44+
///
45+
/// ```ignore
46+
/// let lookup_client = LookupClient::new(config, metadata);
47+
/// let result = lookup_client.lookup(table_path, table_bucket, key_bytes).await?;
48+
/// ```
49+
pub struct LookupClient {
50+
/// Channel to send lookup requests to the queue
51+
lookup_tx: mpsc::Sender<LookupQuery>,
52+
/// Handle to the sender task
53+
sender_handle: Option<JoinHandle<()>>,
54+
/// Shutdown signal sender
55+
shutdown_tx: mpsc::Sender<()>,
56+
}
57+
58+
impl LookupClient {
59+
/// Creates a new lookup client.
60+
pub fn new(config: &Config, metadata: Arc<Metadata>) -> Self {
61+
// Extract configuration values
62+
let queue_size = config.lookup_queue_size;
63+
let max_batch_size = config.lookup_max_batch_size;
64+
let batch_timeout_ms = config.lookup_batch_timeout_ms;
65+
let max_inflight = config.lookup_max_inflight_requests;
66+
let max_retries = config.lookup_max_retries;
67+
68+
// Create queue and channels
69+
let (queue, lookup_tx, re_enqueue_tx) =
70+
LookupQueue::new(queue_size, max_batch_size, batch_timeout_ms);
71+
72+
// Create sender
73+
let mut sender =
74+
LookupSender::new(metadata, queue, re_enqueue_tx, max_inflight, max_retries);
75+
76+
// Create shutdown channel
77+
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
78+
79+
// Spawn sender task
80+
let sender_handle = tokio::spawn(async move {
81+
tokio::select! {
82+
_ = sender.run() => {
83+
debug!("Lookup sender completed");
84+
}
85+
_ = shutdown_rx.recv() => {
86+
debug!("Lookup sender received shutdown signal");
87+
sender.initiate_close();
88+
}
89+
}
90+
});
91+
92+
Self {
93+
lookup_tx,
94+
sender_handle: Some(sender_handle),
95+
shutdown_tx,
96+
}
97+
}
98+
99+
/// Looks up a value by its primary key.
100+
///
101+
/// This method queues the lookup operation and returns a future that will
102+
/// complete when the server responds. Multiple lookups may be batched together
103+
/// for improved throughput.
104+
///
105+
/// # Arguments
106+
/// * `table_path` - The table path
107+
/// * `table_bucket` - The table bucket
108+
/// * `key_bytes` - The encoded primary key bytes
109+
///
110+
/// # Returns
111+
/// * `Ok(Some(bytes))` - The value bytes if found
112+
/// * `Ok(None)` - If the key was not found
113+
/// * `Err(Error)` - If the lookup fails
114+
pub async fn lookup(
115+
&self,
116+
table_path: TablePath,
117+
table_bucket: TableBucket,
118+
key_bytes: Vec<u8>,
119+
) -> Result<Option<Vec<u8>>> {
120+
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
121+
122+
let query = LookupQuery::new(table_path, table_bucket, key_bytes, result_tx);
123+
124+
// Send to queue
125+
self.lookup_tx
126+
.send(query)
127+
.await
128+
.map_err(|_| Error::UnexpectedError {
129+
message: "Failed to queue lookup: channel closed".to_string(),
130+
source: None,
131+
})?;
132+
133+
// Wait for result
134+
result_rx.await.map_err(|_| Error::UnexpectedError {
135+
message: "Lookup result channel closed".to_string(),
136+
source: None,
137+
})?
138+
}
139+
140+
/// Closes the lookup client gracefully.
141+
pub async fn close(mut self, timeout: Duration) {
142+
debug!("Closing lookup client");
143+
144+
// Send shutdown signal
145+
let _ = self.shutdown_tx.send(()).await;
146+
147+
// Wait for sender to complete with timeout
148+
if let Some(handle) = self.sender_handle.take() {
149+
debug!("Waiting for sender task to complete...");
150+
let abort_handle = handle.abort_handle();
151+
152+
match tokio::time::timeout(timeout, handle).await {
153+
Ok(Ok(())) => {
154+
debug!("Lookup sender task completed gracefully.");
155+
}
156+
Ok(Err(join_error)) => {
157+
error!("Lookup sender task panicked: {:?}", join_error);
158+
}
159+
Err(_elapsed) => {
160+
error!("Lookup sender task did not complete within timeout. Forcing shutdown.");
161+
abort_handle.abort();
162+
}
163+
}
164+
} else {
165+
debug!("Lookup client was already closed or never initialized properly.");
166+
}
167+
168+
debug!("Lookup client closed");
169+
}
170+
}
171+
172+
impl Drop for LookupClient {
173+
fn drop(&mut self) {
174+
// Abort the sender task on drop if it wasn't already consumed by close()
175+
if let Some(handle) = self.sender_handle.take() {
176+
handle.abort();
177+
}
178+
}
179+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Lookup query representation for batching lookup operations.
19+
20+
use crate::metadata::{TableBucket, TablePath};
21+
use std::sync::atomic::{AtomicI32, Ordering};
22+
use tokio::sync::oneshot;
23+
24+
/// Represents a single lookup query that will be batched and sent to the server.
25+
pub struct LookupQuery {
26+
/// The table path for this lookup
27+
table_path: TablePath,
28+
/// The table bucket for this lookup
29+
table_bucket: TableBucket,
30+
/// The encoded primary key bytes
31+
key: Vec<u8>,
32+
/// Channel to send the result back to the caller
33+
result_tx: Option<oneshot::Sender<Result<Option<Vec<u8>>, crate::error::Error>>>,
34+
/// Number of retry attempts
35+
retries: AtomicI32,
36+
}
37+
38+
impl LookupQuery {
39+
/// Creates a new lookup query.
40+
pub fn new(
41+
table_path: TablePath,
42+
table_bucket: TableBucket,
43+
key: Vec<u8>,
44+
result_tx: oneshot::Sender<Result<Option<Vec<u8>>, crate::error::Error>>,
45+
) -> Self {
46+
Self {
47+
table_path,
48+
table_bucket,
49+
key,
50+
result_tx: Some(result_tx),
51+
retries: AtomicI32::new(0),
52+
}
53+
}
54+
55+
/// Returns the table path.
56+
#[allow(dead_code)]
57+
pub fn table_path(&self) -> &TablePath {
58+
&self.table_path
59+
}
60+
61+
/// Returns the table bucket.
62+
pub fn table_bucket(&self) -> &TableBucket {
63+
&self.table_bucket
64+
}
65+
66+
/// Returns the encoded key bytes.
67+
#[allow(dead_code)]
68+
pub fn key(&self) -> &[u8] {
69+
&self.key
70+
}
71+
72+
/// Takes ownership of the key bytes.
73+
pub fn take_key(&mut self) -> Vec<u8> {
74+
std::mem::take(&mut self.key)
75+
}
76+
77+
/// Returns the current retry count.
78+
pub fn retries(&self) -> i32 {
79+
self.retries.load(Ordering::Acquire)
80+
}
81+
82+
/// Increments the retry counter.
83+
pub fn increment_retries(&self) {
84+
self.retries.fetch_add(1, Ordering::AcqRel);
85+
}
86+
87+
/// Completes the lookup with a result.
88+
pub fn complete(&mut self, result: Result<Option<Vec<u8>>, crate::error::Error>) {
89+
if let Some(tx) = self.result_tx.take() {
90+
let _ = tx.send(result);
91+
}
92+
}
93+
94+
/// Returns true if the result has already been sent.
95+
pub fn is_done(&self) -> bool {
96+
self.result_tx.is_none()
97+
}
98+
}

0 commit comments

Comments
 (0)