forked from returnString/convergence
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatafusion.rs
More file actions
60 lines (52 loc) · 1.63 KB
/
datafusion.rs
File metadata and controls
60 lines (52 loc) · 1.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use convergence::server::{self, BindOptions};
use convergence_arrow::datafusion::DataFusionEngine;
use convergence_arrow::metadata::Catalog;
use datafusion::arrow::datatypes::DataType;
use datafusion::catalog_common::memory::MemorySchemaProvider;
use datafusion::catalog::CatalogProvider;
use datafusion::catalog_common::MemoryCatalogProvider;
use datafusion::logical_expr::Volatility;
use datafusion::physical_plan::ColumnarValue;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use std::sync::Arc;
async fn new_engine() -> DataFusionEngine {
let ctx = SessionContext::new_with_config(
SessionConfig::new()
.with_information_schema(true)
.with_create_default_catalog_and_schema(false),
);
let mem_catalog = Arc::new(MemoryCatalogProvider::new());
mem_catalog
.register_schema("public", Arc::new(MemorySchemaProvider::new()))
.expect("failed to register schema");
ctx.register_catalog("datafusion", Arc::new(Catalog::new(mem_catalog)));
ctx.register_csv(
"test_100_4buckets",
"convergence-arrow/data/100_4buckets.csv",
CsvReadOptions::new(),
)
.await
.expect("failed to register csv");
ctx.register_udf(create_udf(
"pg_backend_pid",
vec![],
DataType::Int32,
Volatility::Stable,
Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))),
));
ctx.register_udf(create_udf(
"current_schema",
vec![],
DataType::Utf8,
Volatility::Stable,
Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))),
));
DataFusionEngine::new(ctx)
}
#[tokio::main]
async fn main() {
server::run(BindOptions::new(), Arc::new(|| Box::pin(new_engine())))
.await
.unwrap();
}