From 3a4041a682c7ca2d751de2aad536e1c0b2ca385c Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 6 Mar 2026 14:45:33 +0800 Subject: [PATCH 1/2] ser/de ffetch in FilterExec --- datafusion/physical-plan/src/filter.rs | 4 +++ datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 26 ++++++++++++++++--- datafusion/proto/src/generated/prost.rs | 2 ++ datafusion/proto/src/physical_plan/mod.rs | 2 ++ .../tests/cases/roundtrip_physical_plan.rs | 13 ++++++++++ 6 files changed, 45 insertions(+), 3 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ecea4e6ebe9f7..b9e3dd22731aa 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -717,6 +717,10 @@ impl ExecutionPlan for FilterExec { }) } + fn fetch(&self) -> Option { + self.fetch + } + fn with_fetch(&self, fetch: Option) -> Option> { Some(Arc::new(Self { predicate: Arc::clone(&self.predicate), diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 7c0268867691e..37b31a84deab1 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -1030,6 +1030,7 @@ message FilterExecNode { uint32 default_filter_selectivity = 3; repeated uint32 projection = 9; uint32 batch_size = 10; + optional uint32 fetch = 11; } message FileGroup { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 5b2b9133ce13a..8bc9f6a44c26b 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6894,6 +6894,9 @@ impl serde::Serialize for FilterExecNode { if self.batch_size != 0 { len += 1; } + if self.fetch.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FilterExecNode", len)?; if let Some(v) = self.input.as_ref() { struct_ser.serialize_field("input", v)?; @@ -6910,6 +6913,10 @@ impl serde::Serialize for FilterExecNode { if self.batch_size != 0 { struct_ser.serialize_field("batchSize", &self.batch_size)?; } + if let Some(v) = self.fetch.as_ref() { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("fetch", ToString::to_string(&v).as_str())?; + } struct_ser.end() } } @@ -6927,6 +6934,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { "projection", "batch_size", "batchSize", + "fetch", ]; #[allow(clippy::enum_variant_names)] @@ -6936,6 +6944,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { DefaultFilterSelectivity, Projection, BatchSize, + Fetch, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6962,6 +6971,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { "defaultFilterSelectivity" | "default_filter_selectivity" => Ok(GeneratedField::DefaultFilterSelectivity), "projection" => Ok(GeneratedField::Projection), "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), + "fetch" => Ok(GeneratedField::Fetch), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6986,6 +6996,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { let mut default_filter_selectivity__ = None; let mut projection__ = None; let mut batch_size__ = None; + let mut fetch__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Input => { @@ -7004,7 +7015,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if default_filter_selectivity__.is_some() { return Err(serde::de::Error::duplicate_field("defaultFilterSelectivity")); } - default_filter_selectivity__ = + default_filter_selectivity__ = Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } @@ -7012,7 +7023,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if projection__.is_some() { return Err(serde::de::Error::duplicate_field("projection")); } - projection__ = + projection__ = Some(map_.next_value::>>()? .into_iter().map(|x| x.0).collect()) ; @@ -7021,10 +7032,18 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if batch_size__.is_some() { return Err(serde::de::Error::duplicate_field("batchSize")); } - batch_size__ = + batch_size__ = Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } + GeneratedField::Fetch => { + if fetch__.is_some() { + return Err(serde::de::Error::duplicate_field("fetch")); + } + fetch__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } } } Ok(FilterExecNode { @@ -7033,6 +7052,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { default_filter_selectivity: default_filter_selectivity__.unwrap_or_default(), projection: projection__.unwrap_or_default(), batch_size: batch_size__.unwrap_or_default(), + fetch: fetch__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d9602665c284a..a0d4ef9e973c4 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1563,6 +1563,8 @@ pub struct FilterExecNode { pub projection: ::prost::alloc::vec::Vec, #[prost(uint32, tag = "10")] pub batch_size: u32, + #[prost(uint32, optional, tag = "11")] + pub fetch: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct FileGroup { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index bfba715b91249..47fa1319c5986 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -691,6 +691,7 @@ impl protobuf::PhysicalPlanNode { let filter = FilterExecBuilder::new(predicate, input) .apply_projection(projection)? .with_batch_size(filter.batch_size as usize) + .with_fetch(filter.fetch.map(|f| f as usize)) .build()?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( @@ -2320,6 +2321,7 @@ impl protobuf::PhysicalPlanNode { v.iter().map(|x| *x as u32).collect::>() }), batch_size: exec.batch_size() as u32, + fetch: exec.fetch().map(|f| f as u32), }, ))), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 230727c8c1d41..2b8c1056f3b2c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -794,6 +794,19 @@ fn roundtrip_filter_with_not_and_in_list() -> Result<()> { )?)) } +#[test] +fn roundtrip_filter_with_fetch() -> Result<()> { + let field_a = Field::new("a", DataType::Boolean, false); + let field_b = Field::new("b", DataType::Int64, false); + let schema = Arc::new(Schema::new(vec![field_a, field_b])); + let predicate = col("a", &schema)?; + let filter = FilterExecBuilder::new(predicate, Arc::new(EmptyExec::new(schema))) + .with_fetch(Some(10)) + .build()?; + assert_eq!(filter.fetch(), Some(10)); + roundtrip_test(Arc::new(filter)) +} + #[test] fn roundtrip_sort() -> Result<()> { let field_a = Field::new("a", DataType::Boolean, false); From d70fb35875a3b3522250d1227869aa1816e1cdba Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 6 Mar 2026 14:52:33 +0800 Subject: [PATCH 2/2] update --- datafusion/proto/src/generated/pbjson.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 8bc9f6a44c26b..419105c40c792 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -6914,8 +6914,7 @@ impl serde::Serialize for FilterExecNode { struct_ser.serialize_field("batchSize", &self.batch_size)?; } if let Some(v) = self.fetch.as_ref() { - #[allow(clippy::needless_borrow)] - struct_ser.serialize_field("fetch", ToString::to_string(&v).as_str())?; + struct_ser.serialize_field("fetch", v)?; } struct_ser.end() } @@ -7015,7 +7014,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if default_filter_selectivity__.is_some() { return Err(serde::de::Error::duplicate_field("defaultFilterSelectivity")); } - default_filter_selectivity__ = + default_filter_selectivity__ = Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } @@ -7023,7 +7022,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if projection__.is_some() { return Err(serde::de::Error::duplicate_field("projection")); } - projection__ = + projection__ = Some(map_.next_value::>>()? .into_iter().map(|x| x.0).collect()) ; @@ -7032,7 +7031,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if batch_size__.is_some() { return Err(serde::de::Error::duplicate_field("batchSize")); } - batch_size__ = + batch_size__ = Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) ; } @@ -7040,7 +7039,7 @@ impl<'de> serde::Deserialize<'de> for FilterExecNode { if fetch__.is_some() { return Err(serde::de::Error::duplicate_field("fetch")); } - fetch__ = + fetch__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; }