Skip to content

Commit 0166a00

Browse files
authored
Merge pull request #50 from Embucket/sql_extention
Support new datatypes and UDFs
2 parents 08a5942 + 6542852 commit 0166a00

14 files changed

Lines changed: 308 additions & 221 deletions

File tree

crates/catalog/src/models.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::collections::HashMap;
2-
use std::fmt::Display;
32
use uuid::Uuid;
43

54
pub use iceberg::{Namespace, NamespaceIdent, TableCreation, TableRequirement, TableUpdate};

crates/catalog/src/repository.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use async_trait::async_trait;
22
use std::fmt::{Display, Formatter};
33
use std::sync::Arc;
4-
use uuid::Uuid;
54

65
use utils::Db;
7-
use utils::{Entity, Repository};
86

97
use crate::error::Result;
108
use crate::models::{Database, DatabaseIdent, Table, TableIdent, WarehouseIdent};
@@ -244,6 +242,7 @@ mod tests {
244242
},
245243
metadata_location: "s3://bucket/path".to_string(),
246244
metadata: create_table_metadata(),
245+
properties: Default::default(),
247246
};
248247

249248
repo.put(&table).await.expect("failed to create table");

crates/catalog/src/service.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use async_trait::async_trait;
99
use bytes::Bytes;
1010
use chrono::Utc;
1111
use control_plane::models::{StorageProfile, Warehouse};
12-
use control_plane::service::ControlService;
1312
use iceberg::spec::{FormatVersion, TableMetadata};
1413
use iceberg::{spec::TableMetadataBuilder, TableCreation};
1514
use object_store::path::Path;
@@ -326,7 +325,7 @@ impl Catalog for CatalogImpl {
326325
&self,
327326
namespace: &DatabaseIdent,
328327
storage_profile: &StorageProfile,
329-
warehouse: &Warehouse,
328+
_warehouse: &Warehouse,
330329
table_name: String,
331330
metadata_location: String,
332331
properties: Option<HashMap<String, String>>,
@@ -871,7 +870,6 @@ mod tests {
871870
ident: res.unwrap().ident,
872871
requirements: vec![],
873872
updates: vec![update],
874-
properties: None,
875873
};
876874

877875
let res = service.update_table(commit).await;

crates/control_plane/Cargo.toml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ utils = { path = "../utils" }
1313
futures = { workspace = true }
1414
serde = { workspace = true }
1515
datafusion = { workspace = true }
16-
iceberg-rust = { git = "https://github.com/JanKaul/iceberg-rust.git"}
17-
iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git"}
18-
datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git"}
16+
iceberg-rust = { git = "https://github.com/JanKaul/iceberg-rust.git" }
17+
iceberg-rest-catalog = { git = "https://github.com/JanKaul/iceberg-rust.git" }
18+
datafusion_iceberg = { git = "https://github.com/JanKaul/iceberg-rust.git" }
1919
arrow = { version = "53" }
2020
arrow-json = { version = "53" }
2121
datafusion-functions-json = { version = "0.43.0" }
@@ -28,6 +28,9 @@ quick-xml = { version = "0.36.2" }
2828
icelake = { git = "https://github.com/Embucket/icelake.git" }
2929
bytes = { version = "1.8.0" }
3030
url = { version = "2.5.2" }
31+
serde_json = "1.0.128"
32+
once_cell = "1.19.0"
33+
regex = "1.11.0"
3134

3235
[dev-dependencies]
3336
slatedb = { workspace = true }

crates/control_plane/src/service.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use datafusion_iceberg::planner::IcebergQueryPlanner;
21
use crate::error::{extract_error_message, Error, Result};
32
use crate::models::{Credentials, StorageProfile, StorageProfileCreateRequest};
43
use crate::models::{Warehouse, WarehouseCreateRequest};
@@ -10,7 +9,6 @@ use async_trait::async_trait;
109
use bytes::Bytes;
1110
use datafusion::execution::context::SessionContext;
1211
use datafusion::prelude::CsvReadOptions;
13-
use datafusion::execution::SessionStateBuilder;
1412
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
1513
use iceberg_rest_catalog::apis::configuration::Configuration;
1614
use iceberg_rest_catalog::catalog::RestCatalog;
@@ -20,7 +18,6 @@ use object_store::{ObjectStore, PutPayload};
2018
use rusoto_core::{HttpClient, Region};
2119
use rusoto_credential::StaticProvider;
2220
use rusoto_s3::{GetBucketAclRequest, S3Client, S3};
23-
use std::any::Any;
2421
use std::collections::HashMap;
2522
use std::sync::Arc;
2623
use url::Url;
@@ -174,7 +171,7 @@ impl ControlService for ControlServiceImpl {
174171
&self,
175172
warehouse_id: &Uuid,
176173
database_name: &String,
177-
table_name: &String,
174+
_table_name: &String,
178175
query: &String,
179176
) -> Result<String> {
180177
let warehouse = self.get_warehouse(*warehouse_id).await?;
@@ -266,12 +263,9 @@ impl ControlService for ControlServiceImpl {
266263
};
267264
let endpoint_url = Url::parse(storage_profile.endpoint.clone().unwrap().as_str())
268265
.map_err(|e| Error::DataFusionError(format!("Invalid endpoint URL: {}", e)))?;
269-
270266
ctx.register_object_store(&endpoint_url, Arc::from(object_store));
271-
let df = ctx.read_csv(path_string, CsvReadOptions::new()).await?;
272-
let data = df.collect().await?;
273267

274-
println!("{:?}", data);
268+
// println!("{:?}", data);
275269
// Commented code is writing with iceberg-rust-jankaul
276270
// Let it sit here just in case
277271
//////////////////////////////////////
@@ -337,22 +331,22 @@ impl ControlService for ControlServiceImpl {
337331
// ),
338332
])
339333
};
340-
let catalog = icelake::catalog::load_catalog(&config).await.unwrap();
341-
342-
let table_ident = TableIdentifier::new(vec![database_name, table_name]).unwrap();
343-
let mut table = catalog.load_table(&table_ident).await.unwrap();
334+
let catalog = icelake::catalog::load_catalog(&config).await?;
335+
let table_ident = TableIdentifier::new(vec![database_name, table_name])?;
336+
let mut table = catalog.load_table(&table_ident).await?;
337+
let table_schema = table.current_arrow_schema()?;
344338
println!("{:?}", table.table_name());
339+
340+
let df = ctx.read_csv(path_string, CsvReadOptions::new().schema(&*table_schema)).await?;
341+
let data = df.collect().await?;
342+
345343
let builder = table
346-
.writer_builder()
347-
.unwrap()
348-
.rolling_writer_builder(None)
349-
.unwrap();
344+
.writer_builder()?
345+
.rolling_writer_builder(None)?;
350346
let mut writer = table
351-
.writer_builder()
352-
.unwrap()
347+
.writer_builder()?
353348
.build_append_only_writer(builder)
354-
.await
355-
.unwrap();
349+
.await?;
356350

357351
for r in data {
358352
writer.write(&r).await?;
@@ -361,7 +355,7 @@ impl ControlService for ControlServiceImpl {
361355
let res: Vec<icelake::types::DataFile> = writer.close().await?;
362356
let mut txn = icelake::transaction::Transaction::new(&mut table);
363357
txn.append_data_file(res);
364-
txn.commit().await.unwrap();
358+
txn.commit().await?;
365359

366360
Ok(())
367361
}

crates/control_plane/src/sql/functions/common.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, Array
1717
None
1818
}
1919

20-
2120
pub fn convert_record_batches(records: Vec<RecordBatch>) -> DataFusionResult<Vec<RecordBatch>> {
2221
let mut converted_batches = Vec::new();
2322

0 commit comments

Comments
 (0)