|
1 | | -# Apache Fluss™ Rust Client (Incubating) |
| 1 | +# Apache Fluss (Incubating) Official Rust Client |
2 | 2 |
|
3 | | -Rust client library for [Apache Fluss™](https://fluss.apache.org/). This crate provides the core client used by the fluss-rust workspace and by the Python and C++ bindings. |
| 3 | +Official Rust client library for [Apache Fluss (Incubating)](https://fluss.apache.org/). |
4 | 4 |
|
5 | | -# Todo: move how to use to the first, and how to build to the last, https://github.com/apache/opendal/blob/main/core/README.md |
6 | | -# is a good reference |
| 5 | +[](https://crates.io/crates/fluss-rs) |
| 6 | +[](https://docs.rs/fluss-rs/) |
7 | 7 |
|
8 | | -## Requirements |
| 8 | +## Usage |
9 | 9 |
|
10 | | -- Rust (see [rust-toolchain.toml](../../rust-toolchain.toml) at repo root) |
11 | | -- protobuf (for build) |
| 10 | +The following example shows both **primary key (KV) tables** and **log tables** in one flow: connect, create a KV table (upsert + lookup), then create a log table (append + scan). |
12 | 11 |
|
13 | | -## Build |
| 12 | +```rust |
| 13 | +use fluss::client::EARLIEST_OFFSET; |
| 14 | +use fluss::client::FlussConnection; |
| 15 | +use fluss::config::Config; |
| 16 | +use fluss::error::Result; |
| 17 | +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; |
| 18 | +use fluss::row::{GenericRow, InternalRow}; |
| 19 | +use std::time::Duration; |
14 | 20 |
|
15 | | -From the repository root: |
| 21 | +#[tokio::main] |
| 22 | +async fn main() -> Result<()> { |
| 23 | + let mut config = Config::default(); |
| 24 | + config.bootstrap_servers = "127.0.0.1:9123".to_string(); |
| 25 | + let connection = FlussConnection::new(config).await?; |
| 26 | + let admin = connection.get_admin().await?; |
16 | 27 |
|
17 | | -```bash |
18 | | -cargo build -p fluss-rs |
| 28 | + // ---- Primary key (KV) table: upsert and lookup ---- |
| 29 | + let kv_path = TablePath::new("fluss", "users"); |
| 30 | + let mut kv_schema = Schema::builder() |
| 31 | + .column("id", DataTypes::int()) |
| 32 | + .column("name", DataTypes::string()) |
| 33 | + .column("age", DataTypes::bigint()) |
| 34 | + .primary_key(vec!["id"]); |
| 35 | + let kv_descriptor = TableDescriptor::builder() |
| 36 | + .schema(kv_schema.build()?) |
| 37 | + .build()?; |
| 38 | + admin.create_table(&kv_path, &kv_descriptor, false).await?; |
| 39 | + |
| 40 | + let kv_table = connection.get_table(&kv_path).await?; |
| 41 | + let upsert_writer = kv_table.new_upsert()?.create_writer()?; |
| 42 | + let mut row = GenericRow::new(3); |
| 43 | + row.set_field(0, 1i32); |
| 44 | + row.set_field(1, "Alice"); |
| 45 | + row.set_field(2, 30i64); |
| 46 | + upsert_writer.upsert(&row)?; |
| 47 | + upsert_writer.flush().await?; |
| 48 | + |
| 49 | + let mut lookuper = kv_table.new_lookup()?.create_lookuper()?; |
| 50 | + let mut key = GenericRow::new(1); |
| 51 | + key.set_field(0, 1i32); |
| 52 | + let result = lookuper.lookup(&key).await?; |
| 53 | + if let Some(r) = result.get_single_row()? { |
| 54 | + println!("KV lookup: id={}, name={}, age={}", |
| 55 | + r.get_int(0)?, r.get_string(1)?, r.get_long(2)?); |
| 56 | + } |
| 57 | + |
| 58 | + // ---- Log table: append and scan ---- |
| 59 | + let log_path = TablePath::new("fluss", "events"); |
| 60 | + let mut log_schema_builder = Schema::builder() |
| 61 | + .column("ts", DataTypes::bigint()) |
| 62 | + .column("message", DataTypes::string()); |
| 63 | + let log_descriptor = TableDescriptor::builder() |
| 64 | + .schema(log_schema_builder.build()?) |
| 65 | + .build()?; |
| 66 | + admin.create_table(&log_path, &log_descriptor, false).await?; |
| 67 | + |
| 68 | + let log_table = connection.get_table(&log_path).await?; |
| 69 | + let append_writer = log_table.new_append()?.create_writer()?; |
| 70 | + let mut event = GenericRow::new(2); |
| 71 | + event.set_field(0, 1700000000i64); |
| 72 | + event.set_field(1, "hello"); |
| 73 | + append_writer.append(&event)?; |
| 74 | + append_writer.flush().await?; |
| 75 | + |
| 76 | + let scanner = log_table.new_scan().create_log_scanner()?; |
| 77 | + scanner.subscribe(0, EARLIEST_OFFSET).await?; |
| 78 | + let scan_records = scanner.poll(Duration::from_secs(1)).await?; |
| 79 | + for record in scan_records { |
| 80 | + let r = record.row(); |
| 81 | + println!("Log scan: ts={}, message={}", r.get_long(0)?, r.get_string(1)?); |
| 82 | + } |
| 83 | + |
| 84 | + Ok(()) |
| 85 | +} |
19 | 86 | ``` |
20 | 87 |
|
21 | | -## Quick start and examples |
| 88 | +## Storage Support |
| 89 | + |
| 90 | +The Fluss client reads remote data by accessing Fluss’s **remote files** (e.g. log segments and snapshots) directly. The following **remote file systems** are supported; enable the matching feature(s) for your deployment: |
| 91 | + |
| 92 | +| Storage Backend | Feature Flag | Status | Description | |
| 93 | +|----------------|--------------|--------|-------------| |
| 94 | +| Local Filesystem | `storage-fs` | ✅ Stable | Local filesystem storage | |
| 95 | +| Amazon S3 | `storage-s3` | ✅ Stable | Amazon S3 storage | |
| 96 | +| Alibaba Cloud OSS | `storage-oss` | ✅ Stable | Alibaba Cloud Object Storage Service | |
22 | 97 |
|
23 | | -## TODO |
24 | | -- [ ] Expand API documentation and usage examples in this README. |
25 | | -- [ ] Add more examples for table, log scan, and write flows. |
| 98 | +You can enable all storage backends at once using the `storage-all` feature flag. |
| 99 | + |
| 100 | +Example usage in Cargo.toml: |
| 101 | +```toml |
| 102 | +[dependencies] |
| 103 | +fluss-rs = { version = "0.1.0", features = ["storage-s3", "storage-fs"] } |
| 104 | +``` |
0 commit comments