Skip to content

Commit d72fe4e

Browse files
committed
feat: support batch lookup
1 parent 0b53d37 commit d72fe4e

12 files changed

Lines changed: 1283 additions & 64 deletions

File tree

crates/fluss/src/client/connection.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use crate::client::WriterClient;
1919
use crate::client::admin::FlussAdmin;
20+
use crate::client::lookup::LookupClient;
2021
use crate::client::metadata::Metadata;
2122
use crate::client::table::FlussTable;
2223
use crate::config::Config;
@@ -32,6 +33,7 @@ pub struct FlussConnection {
3233
network_connects: Arc<RpcClient>,
3334
args: Config,
3435
writer_client: RwLock<Option<Arc<WriterClient>>>,
36+
lookup_client: RwLock<Option<Arc<LookupClient>>>,
3537
}
3638

3739
impl FlussConnection {
@@ -48,6 +50,7 @@ impl FlussConnection {
4850
network_connects: connections.clone(),
4951
args: arg.clone(),
5052
writer_client: Default::default(),
53+
lookup_client: Default::default(),
5154
})
5255
}
5356

@@ -90,6 +93,30 @@ impl FlussConnection {
9093
Ok(new_client)
9194
}
9295

96+
/// Gets or creates a lookup client for batched lookup operations.
97+
pub fn get_or_create_lookup_client(&self) -> Result<Arc<LookupClient>> {
98+
// 1. Fast path: Attempt to acquire a read lock to check if the client already exists.
99+
if let Some(client) = self.lookup_client.read().as_ref() {
100+
return Ok(client.clone());
101+
}
102+
103+
// 2. Slow path: Acquire the write lock.
104+
let mut lookup_guard = self.lookup_client.write();
105+
106+
// 3. Double-check: Another thread might have initialized the client
107+
// while this thread was waiting for the write lock.
108+
if let Some(client) = lookup_guard.as_ref() {
109+
return Ok(client.clone());
110+
}
111+
112+
// 4. Initialize the client since we are certain it doesn't exist yet.
113+
let new_client = Arc::new(LookupClient::new(&self.args, self.metadata.clone()));
114+
115+
// 5. Store and return the newly created client.
116+
*lookup_guard = Some(new_client.clone());
117+
Ok(new_client)
118+
}
119+
93120
pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
94121
self.metadata.update_table_metadata(table_path).await?;
95122
let table_info = self.metadata.get_cluster().get_table(table_path)?.clone();
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Lookup client that batches multiple lookups together for improved throughput.
19+
//!
20+
//! This client achieves parity with the Java client by:
21+
//! - Queuing lookup operations instead of sending them immediately
22+
//! - Batching multiple lookups to the same server/bucket
23+
//! - Running a background sender task to process batches
24+
25+
use super::{LookupQuery, LookupQueue};
26+
use crate::client::lookup::lookup_sender::LookupSender;
27+
use crate::client::metadata::Metadata;
28+
use crate::config::Config;
29+
use crate::error::{Error, Result};
30+
use crate::metadata::{TableBucket, TablePath};
31+
use log::{debug, error};
32+
use std::sync::Arc;
33+
use std::sync::atomic::{AtomicBool, Ordering};
34+
use std::time::Duration;
35+
use tokio::sync::mpsc;
36+
use tokio::task::JoinHandle;
37+
38+
/// A client that lookups values from the server with batching support.
39+
///
40+
/// The lookup client uses a queue and background sender to batch multiple
41+
/// lookup operations together, reducing network round trips and improving
42+
/// throughput.
43+
///
44+
/// # Example
45+
///
46+
/// ```ignore
47+
/// let lookup_client = LookupClient::new(config, metadata);
48+
/// let result = lookup_client.lookup(table_path, table_bucket, key_bytes).await?;
49+
/// ```
50+
pub struct LookupClient {
51+
/// Channel to send lookup requests to the queue
52+
lookup_tx: mpsc::Sender<LookupQuery>,
53+
/// Handle to the sender task
54+
sender_handle: Option<JoinHandle<()>>,
55+
/// Shutdown signal sender
56+
shutdown_tx: mpsc::Sender<()>,
57+
/// Whether the client is closed
58+
closed: AtomicBool,
59+
}
60+
61+
impl LookupClient {
62+
/// Creates a new lookup client.
63+
pub fn new(config: &Config, metadata: Arc<Metadata>) -> Self {
64+
// Extract configuration values
65+
let queue_size = config.lookup_queue_size;
66+
let max_batch_size = config.lookup_max_batch_size;
67+
let batch_timeout_ms = config.lookup_batch_timeout_ms;
68+
let max_inflight = config.lookup_max_inflight_requests;
69+
let max_retries = config.lookup_max_retries;
70+
71+
// Create queue and channels
72+
let (queue, lookup_tx, re_enqueue_tx) =
73+
LookupQueue::new(queue_size, max_batch_size, batch_timeout_ms);
74+
75+
// Create sender
76+
let mut sender =
77+
LookupSender::new(metadata, queue, re_enqueue_tx, max_inflight, max_retries);
78+
79+
// Create shutdown channel
80+
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
81+
82+
// Spawn sender task
83+
let sender_handle = tokio::spawn(async move {
84+
tokio::select! {
85+
_ = sender.run() => {
86+
debug!("Lookup sender completed");
87+
}
88+
_ = shutdown_rx.recv() => {
89+
debug!("Lookup sender received shutdown signal");
90+
sender.initiate_close();
91+
}
92+
}
93+
});
94+
95+
Self {
96+
lookup_tx,
97+
sender_handle: Some(sender_handle),
98+
shutdown_tx,
99+
closed: AtomicBool::new(false),
100+
}
101+
}
102+
103+
/// Looks up a value by its primary key.
104+
///
105+
/// This method queues the lookup operation and returns a future that will
106+
/// complete when the server responds. Multiple lookups may be batched together
107+
/// for improved throughput.
108+
///
109+
/// # Arguments
110+
/// * `table_path` - The table path
111+
/// * `table_bucket` - The table bucket
112+
/// * `key_bytes` - The encoded primary key bytes
113+
///
114+
/// # Returns
115+
/// * `Ok(Some(bytes))` - The value bytes if found
116+
/// * `Ok(None)` - If the key was not found
117+
/// * `Err(Error)` - If the lookup fails
118+
pub async fn lookup(
119+
&self,
120+
table_path: TablePath,
121+
table_bucket: TableBucket,
122+
key_bytes: Vec<u8>,
123+
) -> Result<Option<Vec<u8>>> {
124+
// Check if the client is closed
125+
if self.closed.load(Ordering::Acquire) {
126+
return Err(Error::UnexpectedError {
127+
message: "Lookup client is closed".to_string(),
128+
source: None,
129+
});
130+
}
131+
132+
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
133+
134+
let query = LookupQuery::new(table_path, table_bucket, key_bytes, result_tx);
135+
136+
// Send to queue
137+
self.lookup_tx
138+
.send(query)
139+
.await
140+
.map_err(|_| Error::UnexpectedError {
141+
message: "Failed to queue lookup: channel closed".to_string(),
142+
source: None,
143+
})?;
144+
145+
// Wait for result
146+
result_rx.await.map_err(|_| Error::UnexpectedError {
147+
message: "Lookup result channel closed".to_string(),
148+
source: None,
149+
})?
150+
}
151+
152+
/// Closes the lookup client gracefully.
153+
pub async fn close(mut self, timeout: Duration) {
154+
debug!("Closing lookup client");
155+
156+
// Mark as closed to reject new lookups
157+
self.closed.store(true, Ordering::Release);
158+
159+
// Send shutdown signal
160+
let _ = self.shutdown_tx.send(()).await;
161+
162+
// Wait for sender to complete with timeout
163+
if let Some(handle) = self.sender_handle.take() {
164+
debug!("Waiting for sender task to complete...");
165+
let abort_handle = handle.abort_handle();
166+
167+
match tokio::time::timeout(timeout, handle).await {
168+
Ok(Ok(())) => {
169+
debug!("Lookup sender task completed gracefully.");
170+
}
171+
Ok(Err(join_error)) => {
172+
error!("Lookup sender task panicked: {:?}", join_error);
173+
}
174+
Err(_elapsed) => {
175+
error!("Lookup sender task did not complete within timeout. Forcing shutdown.");
176+
abort_handle.abort();
177+
}
178+
}
179+
} else {
180+
debug!("Lookup client was already closed or never initialized properly.");
181+
}
182+
183+
debug!("Lookup client closed");
184+
}
185+
}
186+
187+
impl Drop for LookupClient {
188+
fn drop(&mut self) {
189+
// Abort the sender task on drop if it wasn't already consumed by close()
190+
if let Some(handle) = self.sender_handle.take() {
191+
handle.abort();
192+
}
193+
}
194+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Lookup query representation for batching lookup operations.
19+
20+
use crate::metadata::{TableBucket, TablePath};
21+
use std::sync::atomic::{AtomicI32, Ordering};
22+
use tokio::sync::oneshot;
23+
24+
/// Represents a single lookup query that will be batched and sent to the server.
25+
pub struct LookupQuery {
26+
/// The table path for this lookup
27+
table_path: TablePath,
28+
/// The table bucket for this lookup
29+
table_bucket: TableBucket,
30+
/// The encoded primary key bytes
31+
key: Vec<u8>,
32+
/// Channel to send the result back to the caller
33+
result_tx: Option<oneshot::Sender<Result<Option<Vec<u8>>, crate::error::Error>>>,
34+
/// Number of retry attempts
35+
retries: AtomicI32,
36+
}
37+
38+
impl LookupQuery {
39+
/// Creates a new lookup query.
40+
pub fn new(
41+
table_path: TablePath,
42+
table_bucket: TableBucket,
43+
key: Vec<u8>,
44+
result_tx: oneshot::Sender<Result<Option<Vec<u8>>, crate::error::Error>>,
45+
) -> Self {
46+
Self {
47+
table_path,
48+
table_bucket,
49+
key,
50+
result_tx: Some(result_tx),
51+
retries: AtomicI32::new(0),
52+
}
53+
}
54+
55+
/// Returns the table path.
56+
#[allow(dead_code)]
57+
pub fn table_path(&self) -> &TablePath {
58+
&self.table_path
59+
}
60+
61+
/// Returns the table bucket.
62+
pub fn table_bucket(&self) -> &TableBucket {
63+
&self.table_bucket
64+
}
65+
66+
/// Returns the encoded key bytes.
67+
pub fn key(&self) -> &[u8] {
68+
&self.key
69+
}
70+
71+
/// Returns the current retry count.
72+
pub fn retries(&self) -> i32 {
73+
self.retries.load(Ordering::Acquire)
74+
}
75+
76+
/// Increments the retry counter.
77+
pub fn increment_retries(&self) {
78+
self.retries.fetch_add(1, Ordering::AcqRel);
79+
}
80+
81+
/// Completes the lookup with a result.
82+
pub fn complete(&mut self, result: Result<Option<Vec<u8>>, crate::error::Error>) {
83+
if let Some(tx) = self.result_tx.take() {
84+
let _ = tx.send(result);
85+
}
86+
}
87+
88+
/// Returns true if the result has already been sent.
89+
pub fn is_done(&self) -> bool {
90+
self.result_tx.is_none()
91+
}
92+
}

0 commit comments

Comments
 (0)