From 1c0bdda4d930941da53731189c072cc1e56404d8 Mon Sep 17 00:00:00 2001 From: QP Hou Date: Thu, 2 Jan 2025 03:10:56 -0800 Subject: [PATCH 01/10] update datafusion to 43 (#295) * update datafusion to 43 * fix test --- convergence-arrow/Cargo.toml | 2 +- convergence-arrow/examples/datafusion.rs | 9 +++++---- convergence-arrow/src/metadata.rs | 4 +++- convergence/src/protocol_ext.rs | 2 +- convergence/tests/test_connection.rs | 6 +++--- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 3d483b5..3755a14 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/returnString/convergence" [dependencies] tokio = { version = "1" } async-trait = "0.1" -datafusion = "38" +datafusion = "43" convergence = { path = "../convergence", version = "0.16.0" } chrono = "0.4" diff --git a/convergence-arrow/examples/datafusion.rs b/convergence-arrow/examples/datafusion.rs index aa4591e..d1c40f3 100644 --- a/convergence-arrow/examples/datafusion.rs +++ b/convergence-arrow/examples/datafusion.rs @@ -2,8 +2,9 @@ use convergence::server::{self, BindOptions}; use convergence_arrow::datafusion::DataFusionEngine; use convergence_arrow::metadata::Catalog; use datafusion::arrow::datatypes::DataType; -use datafusion::catalog::schema::MemorySchemaProvider; -use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider}; +use datafusion::catalog_common::memory::MemorySchemaProvider; +use datafusion::catalog::CatalogProvider; +use datafusion::catalog_common::MemoryCatalogProvider; use datafusion::logical_expr::Volatility; use datafusion::physical_plan::ColumnarValue; use datafusion::prelude::*; @@ -35,7 +36,7 @@ async fn new_engine() -> DataFusionEngine { ctx.register_udf(create_udf( "pg_backend_pid", vec![], - Arc::new(DataType::Int32), + DataType::Int32, Volatility::Stable, Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(0))))), )); @@ -43,7 +44,7 @@ async fn new_engine() -> DataFusionEngine { ctx.register_udf(create_udf( "current_schema", vec![], - Arc::new(DataType::Utf8), + DataType::Utf8, Volatility::Stable, Arc::new(|_| Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some("public".to_owned()))))), )); diff --git a/convergence-arrow/src/metadata.rs b/convergence-arrow/src/metadata.rs index ad63207..6417711 100644 --- a/convergence-arrow/src/metadata.rs +++ b/convergence-arrow/src/metadata.rs @@ -3,8 +3,9 @@ use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder}; use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::catalog::schema::{MemorySchemaProvider, SchemaProvider}; use datafusion::catalog::CatalogProvider; +use datafusion::catalog::SchemaProvider; +use datafusion::catalog_common::memory::MemorySchemaProvider; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::DataFusionError; use std::convert::TryInto; @@ -153,6 +154,7 @@ impl MetadataBuilder { } /// Wrapper catalog supporting generation of pg metadata (e.g. pg_catalog schema). +#[derive(Debug)] pub struct Catalog { wrapped: Arc, } diff --git a/convergence/src/protocol_ext.rs b/convergence/src/protocol_ext.rs index 575090f..b146775 100644 --- a/convergence/src/protocol_ext.rs +++ b/convergence/src/protocol_ext.rs @@ -138,7 +138,7 @@ impl<'a> DataRowWriter<'a> { primitive_write!(write_float8, f64); } -impl<'a> Drop for DataRowWriter<'a> { +impl Drop for DataRowWriter<'_> { fn drop(&mut self) { assert_eq!( self.parent.num_cols, self.current_col, diff --git a/convergence/tests/test_connection.rs b/convergence/tests/test_connection.rs index c234a57..1f23bdf 100644 --- a/convergence/tests/test_connection.rs +++ b/convergence/tests/test_connection.rs @@ -79,16 +79,16 @@ async fn extended_query_flow() { async fn simple_query_flow() { let client = setup().await; let messages = client.simple_query("select 1").await.unwrap(); - assert_eq!(messages.len(), 2); + assert_eq!(messages.len(), 3); - let row = match &messages[0] { + let row = match &messages[1] { SimpleQueryMessage::Row(row) => row, _ => panic!("expected row"), }; assert_eq!(row.get(0), Some("1")); - let num_rows = match &messages[1] { + let num_rows = match &messages[2] { SimpleQueryMessage::CommandComplete(rows) => *rows, _ => panic!("expected command complete"), }; From 08661074441362ed549a4507bf9a085276c8c7f4 Mon Sep 17 00:00:00 2001 From: Ruan Pearce-Authers Date: Mon, 28 Apr 2025 09:58:06 +0100 Subject: [PATCH 02/10] Update CI to use Ubuntu 24-04 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index eeffb28..537f6ef 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -10,7 +10,7 @@ jobs: fail-fast: false matrix: os: - - ubuntu-20.04 + - ubuntu-24.04 toolchain: - 1.68.0 From cf77fc73bb8c457648599c8a6fcbcee1e580d25c Mon Sep 17 00:00:00 2001 From: Ruan Pearce-Authers Date: Mon, 28 Apr 2025 10:39:35 +0100 Subject: [PATCH 03/10] Upgrade DataFusion to 47 --- convergence-arrow/Cargo.toml | 2 +- convergence-arrow/examples/datafusion.rs | 5 ++--- convergence-arrow/src/metadata.rs | 5 ++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 3755a14..6f9d4a5 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/returnString/convergence" [dependencies] tokio = { version = "1" } async-trait = "0.1" -datafusion = "43" +datafusion = "47" convergence = { path = "../convergence", version = "0.16.0" } chrono = "0.4" diff --git a/convergence-arrow/examples/datafusion.rs b/convergence-arrow/examples/datafusion.rs index d1c40f3..e5cdd0b 100644 --- a/convergence-arrow/examples/datafusion.rs +++ b/convergence-arrow/examples/datafusion.rs @@ -2,9 +2,8 @@ use convergence::server::{self, BindOptions}; use convergence_arrow::datafusion::DataFusionEngine; use convergence_arrow::metadata::Catalog; use datafusion::arrow::datatypes::DataType; -use datafusion::catalog_common::memory::MemorySchemaProvider; -use datafusion::catalog::CatalogProvider; -use datafusion::catalog_common::MemoryCatalogProvider; +use datafusion::catalog::memory::MemorySchemaProvider; +use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider}; use datafusion::logical_expr::Volatility; use datafusion::physical_plan::ColumnarValue; use datafusion::prelude::*; diff --git a/convergence-arrow/src/metadata.rs b/convergence-arrow/src/metadata.rs index 6417711..2bec6f2 100644 --- a/convergence-arrow/src/metadata.rs +++ b/convergence-arrow/src/metadata.rs @@ -3,9 +3,8 @@ use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder}; use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::catalog::CatalogProvider; -use datafusion::catalog::SchemaProvider; -use datafusion::catalog_common::memory::MemorySchemaProvider; +use datafusion::catalog::memory::MemorySchemaProvider; +use datafusion::catalog::{CatalogProvider, SchemaProvider}; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::DataFusionError; use std::convert::TryInto; From 4347f4fe1ff4deb8d81b7cddaa40aff9dd5234a5 Mon Sep 17 00:00:00 2001 From: Mike Shauneu Date: Fri, 25 Apr 2025 19:02:19 -0400 Subject: [PATCH 04/10] Add Utf8 support --- .gitignore | 2 ++ convergence-arrow/Cargo.toml | 2 +- convergence-arrow/src/table.rs | 5 +++-- convergence-arrow/tests/test_arrow.rs | 13 +++++++++---- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 96ef6c0..04fa3b6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /target Cargo.lock +.idea/ + diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 6f9d4a5..c4e43cb 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -12,7 +12,7 @@ tokio = { version = "1" } async-trait = "0.1" datafusion = "47" convergence = { path = "../convergence", version = "0.16.0" } -chrono = "0.4" +chrono = "=0.4.39" [dev-dependencies] tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] } diff --git a/convergence-arrow/src/table.rs b/convergence-arrow/src/table.rs index fb23e98..34f9740 100644 --- a/convergence-arrow/src/table.rs +++ b/convergence-arrow/src/table.rs @@ -4,7 +4,7 @@ use convergence::protocol::{DataTypeOid, ErrorResponse, FieldDescription, SqlSta use convergence::protocol_ext::DataRowBatch; use datafusion::arrow::array::{ BooleanArray, Date32Array, Date64Array, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + Int64Array, Int8Array, StringArray, StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use datafusion::arrow::datatypes::{DataType, Schema, TimeUnit}; @@ -48,6 +48,7 @@ pub fn record_batch_to_rows(arrow_batch: &RecordBatch, pg_batch: &mut DataRowBat DataType::Float32 => row.write_float4(array_val!(Float32Array, col, row_idx)), DataType::Float64 => row.write_float8(array_val!(Float64Array, col, row_idx)), DataType::Utf8 => row.write_string(array_val!(StringArray, col, row_idx)), + DataType::Utf8View => row.write_string(array_val!(StringViewArray, col, row_idx)), DataType::Date32 => { row.write_date(array_val!(Date32Array, col, row_idx, value_as_date).ok_or_else(|| { ErrorResponse::error(SqlState::InvalidDatetimeFormat, "unsupported date type") @@ -102,7 +103,7 @@ pub fn data_type_to_oid(ty: &DataType) -> Result { DataType::UInt64 => DataTypeOid::Int8, DataType::Float16 | DataType::Float32 => DataTypeOid::Float4, DataType::Float64 => DataTypeOid::Float8, - DataType::Utf8 => DataTypeOid::Text, + DataType::Utf8 | DataType::Utf8View => DataTypeOid::Text, DataType::Date32 | DataType::Date64 => DataTypeOid::Date, DataType::Timestamp(_, None) => DataTypeOid::Timestamp, other => { diff --git a/convergence-arrow/tests/test_arrow.rs b/convergence-arrow/tests/test_arrow.rs index f1dc31e..3a9e8e8 100644 --- a/convergence-arrow/tests/test_arrow.rs +++ b/convergence-arrow/tests/test_arrow.rs @@ -6,7 +6,7 @@ use convergence::protocol_ext::DataRowBatch; use convergence::server::{self, BindOptions}; use convergence::sqlparser::ast::Statement; use convergence_arrow::table::{record_batch_to_rows, schema_to_field_desc}; -use datafusion::arrow::array::{ArrayRef, Date32Array, Float32Array, Int32Array, StringArray, TimestampSecondArray}; +use datafusion::arrow::array::{ArrayRef, Date32Array, Float32Array, Int32Array, StringArray, StringViewArray, TimestampSecondArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use std::sync::Arc; @@ -32,6 +32,7 @@ impl ArrowEngine { let int_col = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; let float_col = Arc::new(Float32Array::from(vec![1.5, 2.5, 3.5])) as ArrayRef; let string_col = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef; + let string_view_col = Arc::new(StringViewArray::from(vec!["aa", "bb", "cc"])) as ArrayRef; let ts_col = Arc::new(TimestampSecondArray::from(vec![1577836800, 1580515200, 1583020800])) as ArrayRef; let date_col = Arc::new(Date32Array::from(vec![0, 1, 2])) as ArrayRef; @@ -39,12 +40,13 @@ impl ArrowEngine { Field::new("int_col", DataType::Int32, true), Field::new("float_col", DataType::Float32, true), Field::new("string_col", DataType::Utf8, true), + Field::new("string_view_col", DataType::Utf8View, true), Field::new("ts_col", DataType::Timestamp(TimeUnit::Second, None), true), Field::new("date_col", DataType::Date32, true), ]); Self { - batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, string_col, ts_col, date_col]) + batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, string_col, string_view_col, ts_col, date_col]) .expect("failed to create batch"), } } @@ -89,8 +91,8 @@ async fn basic_data_types() { let rows = client.query("select 1", &[]).await.unwrap(); let get_row = |idx: usize| { let row = &rows[idx]; - let cols: (i32, f32, &str, NaiveDateTime, NaiveDate) = - (row.get(0), row.get(1), row.get(2), row.get(3), row.get(4)); + let cols: (i32, f32, &str, &str, NaiveDateTime, NaiveDate) = + (row.get(0), row.get(1), row.get(2), row.get(3), row.get(4), row.get(5)); cols }; @@ -100,6 +102,7 @@ async fn basic_data_types() { 1, 1.5, "a", + "aa", NaiveDate::from_ymd_opt(2020, 1, 1) .unwrap() .and_hms_opt(0, 0, 0) @@ -113,6 +116,7 @@ async fn basic_data_types() { 2, 2.5, "b", + "bb", NaiveDate::from_ymd_opt(2020, 2, 1) .unwrap() .and_hms_opt(0, 0, 0) @@ -126,6 +130,7 @@ async fn basic_data_types() { 3, 3.5, "c", + "cc", NaiveDate::from_ymd_opt(2020, 3, 1) .unwrap() .and_hms_opt(0, 0, 0) From e8df7f7f8a875f32d20f715eb7af7dd2b90f08c9 Mon Sep 17 00:00:00 2001 From: Mike Shauneu Date: Sat, 26 Apr 2025 10:15:29 -0400 Subject: [PATCH 05/10] Unpin chrono --- convergence-arrow/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index c4e43cb..6f9d4a5 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -12,7 +12,7 @@ tokio = { version = "1" } async-trait = "0.1" datafusion = "47" convergence = { path = "../convergence", version = "0.16.0" } -chrono = "=0.4.39" +chrono = "0.4" [dev-dependencies] tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] } From 24d05147c256bcc2132b25e182f77826234ac9c8 Mon Sep 17 00:00:00 2001 From: Mike Shauneu Date: Mon, 28 Apr 2025 08:53:02 -0400 Subject: [PATCH 06/10] Add Decimal128 support. (#297) --- .gitignore | 1 - convergence-arrow/Cargo.toml | 3 +-- convergence-arrow/src/table.rs | 9 ++++++--- convergence-arrow/tests/test_arrow.rs | 14 ++++++++++---- convergence/Cargo.toml | 3 +-- convergence/src/protocol.rs | 2 ++ convergence/src/protocol_ext.rs | 20 ++++++++++++++++++++ 7 files changed, 40 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 04fa3b6..77147e2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ /target Cargo.lock .idea/ - diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 6f9d4a5..c8632d9 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -13,6 +13,5 @@ async-trait = "0.1" datafusion = "47" convergence = { path = "../convergence", version = "0.16.0" } chrono = "0.4" - -[dev-dependencies] tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] } +rust_decimal = { version = "1.37.1", features = ["default", "db-postgres"] } diff --git a/convergence-arrow/src/table.rs b/convergence-arrow/src/table.rs index 34f9740..992d373 100644 --- a/convergence-arrow/src/table.rs +++ b/convergence-arrow/src/table.rs @@ -3,9 +3,10 @@ use convergence::protocol::{DataTypeOid, ErrorResponse, FieldDescription, SqlState}; use convergence::protocol_ext::DataRowBatch; use datafusion::arrow::array::{ - BooleanArray, Date32Array, Date64Array, Float16Array, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, StringArray, StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, + BooleanArray, Date32Array, Date64Array, Decimal128Array, Float16Array, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, StringArray, + StringViewArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array }; use datafusion::arrow::datatypes::{DataType, Schema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; @@ -47,6 +48,7 @@ pub fn record_batch_to_rows(arrow_batch: &RecordBatch, pg_batch: &mut DataRowBat DataType::Float16 => row.write_float4(array_val!(Float16Array, col, row_idx).to_f32()), DataType::Float32 => row.write_float4(array_val!(Float32Array, col, row_idx)), DataType::Float64 => row.write_float8(array_val!(Float64Array, col, row_idx)), + DataType::Decimal128(p, s) => row.write_numeric_16(array_val!(Decimal128Array, col, row_idx), p, s), DataType::Utf8 => row.write_string(array_val!(StringArray, col, row_idx)), DataType::Utf8View => row.write_string(array_val!(StringViewArray, col, row_idx)), DataType::Date32 => { @@ -103,6 +105,7 @@ pub fn data_type_to_oid(ty: &DataType) -> Result { DataType::UInt64 => DataTypeOid::Int8, DataType::Float16 | DataType::Float32 => DataTypeOid::Float4, DataType::Float64 => DataTypeOid::Float8, + DataType::Decimal128(_, _) => DataTypeOid::Numeric, DataType::Utf8 | DataType::Utf8View => DataTypeOid::Text, DataType::Date32 | DataType::Date64 => DataTypeOid::Date, DataType::Timestamp(_, None) => DataTypeOid::Timestamp, diff --git a/convergence-arrow/tests/test_arrow.rs b/convergence-arrow/tests/test_arrow.rs index 3a9e8e8..aecf492 100644 --- a/convergence-arrow/tests/test_arrow.rs +++ b/convergence-arrow/tests/test_arrow.rs @@ -6,10 +6,11 @@ use convergence::protocol_ext::DataRowBatch; use convergence::server::{self, BindOptions}; use convergence::sqlparser::ast::Statement; use convergence_arrow::table::{record_batch_to_rows, schema_to_field_desc}; -use datafusion::arrow::array::{ArrayRef, Date32Array, Float32Array, Int32Array, StringArray, StringViewArray, TimestampSecondArray}; +use datafusion::arrow::array::{ArrayRef, Date32Array, Decimal128Array, Float32Array, Int32Array, StringArray, StringViewArray, TimestampSecondArray}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use datafusion::arrow::record_batch::RecordBatch; use std::sync::Arc; +use rust_decimal::Decimal; use tokio_postgres::{connect, NoTls}; struct ArrowPortal { @@ -31,6 +32,7 @@ impl ArrowEngine { fn new() -> Self { let int_col = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; let float_col = Arc::new(Float32Array::from(vec![1.5, 2.5, 3.5])) as ArrayRef; + let decimal_col = Arc::new(Decimal128Array::from(vec![11, 22, 33]).with_precision_and_scale(2, 0).unwrap()) as ArrayRef; let string_col = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef; let string_view_col = Arc::new(StringViewArray::from(vec!["aa", "bb", "cc"])) as ArrayRef; let ts_col = Arc::new(TimestampSecondArray::from(vec![1577836800, 1580515200, 1583020800])) as ArrayRef; @@ -39,6 +41,7 @@ impl ArrowEngine { let schema = Schema::new(vec![ Field::new("int_col", DataType::Int32, true), Field::new("float_col", DataType::Float32, true), + Field::new("decimal_col", DataType::Decimal128(2, 0), true), Field::new("string_col", DataType::Utf8, true), Field::new("string_view_col", DataType::Utf8View, true), Field::new("ts_col", DataType::Timestamp(TimeUnit::Second, None), true), @@ -46,7 +49,7 @@ impl ArrowEngine { ]); Self { - batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, string_col, string_view_col, ts_col, date_col]) + batch: RecordBatch::try_new(Arc::new(schema), vec![int_col, float_col, decimal_col, string_col, string_view_col, ts_col, date_col]) .expect("failed to create batch"), } } @@ -91,8 +94,8 @@ async fn basic_data_types() { let rows = client.query("select 1", &[]).await.unwrap(); let get_row = |idx: usize| { let row = &rows[idx]; - let cols: (i32, f32, &str, &str, NaiveDateTime, NaiveDate) = - (row.get(0), row.get(1), row.get(2), row.get(3), row.get(4), row.get(5)); + let cols: (i32, f32, Decimal, &str, &str, NaiveDateTime, NaiveDate) = + (row.get(0), row.get(1), row.get(2), row.get(3), row.get(4), row.get(5), row.get(6)); cols }; @@ -101,6 +104,7 @@ async fn basic_data_types() { ( 1, 1.5, + Decimal::from(11), "a", "aa", NaiveDate::from_ymd_opt(2020, 1, 1) @@ -115,6 +119,7 @@ async fn basic_data_types() { ( 2, 2.5, + Decimal::from(22), "b", "bb", NaiveDate::from_ymd_opt(2020, 2, 1) @@ -129,6 +134,7 @@ async fn basic_data_types() { ( 3, 3.5, + Decimal::from(33), "c", "cc", NaiveDate::from_ymd_opt(2020, 3, 1) diff --git a/convergence/Cargo.toml b/convergence/Cargo.toml index b433b33..8ed45c6 100644 --- a/convergence/Cargo.toml +++ b/convergence/Cargo.toml @@ -16,6 +16,5 @@ futures = "0.3" sqlparser = "0.46" async-trait = "0.1" chrono = "0.4" - -[dev-dependencies] +rust_decimal = { version = "1.37.1", features = ["default", "db-postgres"] } tokio-postgres = "0.7" diff --git a/convergence/src/protocol.rs b/convergence/src/protocol.rs index 8ac5bdf..bfae419 100644 --- a/convergence/src/protocol.rs +++ b/convergence/src/protocol.rs @@ -75,6 +75,8 @@ data_types! { Float4 = 700, 4 Float8 = 701, 8 + Numeric = 1700, -1 + Date = 1082, 4 Timestamp = 1114, 8 diff --git a/convergence/src/protocol_ext.rs b/convergence/src/protocol_ext.rs index b146775..f2db117 100644 --- a/convergence/src/protocol_ext.rs +++ b/convergence/src/protocol_ext.rs @@ -3,6 +3,8 @@ use crate::protocol::{ConnectionCodec, FormatCode, ProtocolError, RowDescription}; use bytes::{BufMut, BytesMut}; use chrono::{NaiveDate, NaiveDateTime}; +use rust_decimal::Decimal; +use tokio_postgres::types::{ToSql, Type}; use tokio_util::codec::Encoder; /// Supports batched rows for e.g. returning portal result sets. @@ -131,6 +133,24 @@ impl<'a> DataRowWriter<'a> { } } + /// Writes a numeric value for the next column. + pub fn write_numeric_16(&mut self, val: i128, _p: &u8, s: &i8) { + let decimal = Decimal::from_i128_with_scale(val, *s as u32); + match self.parent.format_code { + FormatCode::Text => { + self.write_string(&decimal.to_string()) + } + FormatCode::Binary => { + let numeric_type = Type::from_oid(1700).expect("failed to create numeric type"); + let mut buf = BytesMut::new(); + decimal.to_sql(&numeric_type, &mut buf) + .expect("failed to write numeric"); + + self.write_value(&buf.freeze()) + } + }; + } + primitive_write!(write_int2, i16); primitive_write!(write_int4, i32); primitive_write!(write_int8, i64); From 546ee8d0448831876c6f9d35a12f558809c5b8b2 Mon Sep 17 00:00:00 2001 From: Ruan Pearce-Authers Date: Mon, 28 Apr 2025 13:54:30 +0100 Subject: [PATCH 07/10] Release 0.17.0 convergence@0.17.0 convergence-arrow@0.17.0 Generated by cargo-workspaces --- convergence-arrow/Cargo.toml | 4 ++-- convergence/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index c8632d9..7822538 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "convergence-arrow" -version = "0.16.0" +version = "0.17.0" authors = ["Ruan Pearce-Authers "] edition = "2018" description = "Utils for bridging Apache Arrow and PostgreSQL's wire protocol" @@ -11,7 +11,7 @@ repository = "https://github.com/returnString/convergence" tokio = { version = "1" } async-trait = "0.1" datafusion = "47" -convergence = { path = "../convergence", version = "0.16.0" } +convergence = { path = "../convergence", version = "0.17.0" } chrono = "0.4" tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] } rust_decimal = { version = "1.37.1", features = ["default", "db-postgres"] } diff --git a/convergence/Cargo.toml b/convergence/Cargo.toml index 8ed45c6..3079181 100644 --- a/convergence/Cargo.toml +++ b/convergence/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "convergence" -version = "0.16.0" +version = "0.17.0" authors = ["Ruan Pearce-Authers "] edition = "2018" description = "Write servers that speak PostgreSQL's wire protocol" From 2b92438e5353a0254ec3a98e4a41e412d67f3eb3 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Tue, 31 Dec 2024 17:43:27 -0800 Subject: [PATCH 08/10] downgrade datafusion to 43 # Conflicts: # convergence-arrow/Cargo.toml # convergence-arrow/src/metadata.rs --- convergence-arrow/Cargo.toml | 4 ++-- convergence-arrow/src/metadata.rs | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 7822538..5f95eca 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -10,8 +10,8 @@ repository = "https://github.com/returnString/convergence" [dependencies] tokio = { version = "1" } async-trait = "0.1" -datafusion = "47" -convergence = { path = "../convergence", version = "0.17.0" } +datafusion = "43" +convergence = { path = "../convergence", version = "0.16.0" } chrono = "0.4" tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] } rust_decimal = { version = "1.37.1", features = ["default", "db-postgres"] } diff --git a/convergence-arrow/src/metadata.rs b/convergence-arrow/src/metadata.rs index 2bec6f2..6417711 100644 --- a/convergence-arrow/src/metadata.rs +++ b/convergence-arrow/src/metadata.rs @@ -3,8 +3,9 @@ use datafusion::arrow::array::{ArrayRef, Int32Builder, StringBuilder, UInt32Builder}; use datafusion::arrow::datatypes::{Field, Schema}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::catalog::memory::MemorySchemaProvider; -use datafusion::catalog::{CatalogProvider, SchemaProvider}; +use datafusion::catalog::CatalogProvider; +use datafusion::catalog::SchemaProvider; +use datafusion::catalog_common::memory::MemorySchemaProvider; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::DataFusionError; use std::convert::TryInto; From 900430c82ae5c56199631c442a90f5ddcc5ee6dc Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Wed, 1 Jan 2025 12:58:57 -0800 Subject: [PATCH 09/10] fix test --- convergence-arrow/examples/datafusion.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/convergence-arrow/examples/datafusion.rs b/convergence-arrow/examples/datafusion.rs index e5cdd0b..d1c40f3 100644 --- a/convergence-arrow/examples/datafusion.rs +++ b/convergence-arrow/examples/datafusion.rs @@ -2,8 +2,9 @@ use convergence::server::{self, BindOptions}; use convergence_arrow::datafusion::DataFusionEngine; use convergence_arrow::metadata::Catalog; use datafusion::arrow::datatypes::DataType; -use datafusion::catalog::memory::MemorySchemaProvider; -use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider}; +use datafusion::catalog_common::memory::MemorySchemaProvider; +use datafusion::catalog::CatalogProvider; +use datafusion::catalog_common::MemoryCatalogProvider; use datafusion::logical_expr::Volatility; use datafusion::physical_plan::ColumnarValue; use datafusion::prelude::*; From c1b20f5de7cf700b47329108b351f812d561ebcc Mon Sep 17 00:00:00 2001 From: Mike Shauneu Date: Mon, 28 Apr 2025 13:11:33 -0400 Subject: [PATCH 10/10] downgrade chrono and rust-decimal --- convergence-arrow/Cargo.toml | 6 +++--- convergence/Cargo.toml | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/convergence-arrow/Cargo.toml b/convergence-arrow/Cargo.toml index 5f95eca..8b5fa2a 100644 --- a/convergence-arrow/Cargo.toml +++ b/convergence-arrow/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/returnString/convergence" tokio = { version = "1" } async-trait = "0.1" datafusion = "43" -convergence = { path = "../convergence", version = "0.16.0" } -chrono = "0.4" +convergence = { path = "../convergence", version = "0.17.0" } +chrono = "=0.4.39" tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4" ] } -rust_decimal = { version = "1.37.1", features = ["default", "db-postgres"] } +rust_decimal = { version = "1.36.0", features = ["default", "db-postgres"] } diff --git a/convergence/Cargo.toml b/convergence/Cargo.toml index 3079181..9f6287e 100644 --- a/convergence/Cargo.toml +++ b/convergence/Cargo.toml @@ -15,6 +15,6 @@ bytes = "1" futures = "0.3" sqlparser = "0.46" async-trait = "0.1" -chrono = "0.4" -rust_decimal = { version = "1.37.1", features = ["default", "db-postgres"] } +chrono = "=0.4.39" +rust_decimal = { version = "1.36.0", features = ["default", "db-postgres"] } tokio-postgres = "0.7"