-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat: make DefaultLogicalExtensionCodec support serialisation of buil… #20638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -207,6 +207,102 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { | |
| ) -> Result<()> { | ||
| not_impl_err!("LogicalExtensionCodec is not provided") | ||
| } | ||
|
|
||
| fn try_decode_file_format( | ||
| &self, | ||
| buf: &[u8], | ||
| ctx: &TaskContext, | ||
| ) -> Result<Arc<dyn FileFormatFactory>> { | ||
| use prost::Message; | ||
|
Acfboy marked this conversation as resolved.
Outdated
Acfboy marked this conversation as resolved.
Outdated
|
||
|
|
||
| let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| { | ||
| internal_datafusion_err!("Failed to decode FileFormatProto: {e}") | ||
| })?; | ||
|
|
||
| let kind = protobuf::FileFormatKind::try_from(proto.kind).map_err(|_| { | ||
| internal_datafusion_err!("Unknown FileFormatKind: {}", proto.kind) | ||
| })?; | ||
|
|
||
| match kind { | ||
| protobuf::FileFormatKind::Csv => file_formats::CsvLogicalExtensionCodec | ||
| .try_decode_file_format(&proto.options, ctx), | ||
| protobuf::FileFormatKind::Json => file_formats::JsonLogicalExtensionCodec | ||
| .try_decode_file_format(&proto.options, ctx), | ||
| #[cfg(feature = "parquet")] | ||
| protobuf::FileFormatKind::Parquet => { | ||
| file_formats::ParquetLogicalExtensionCodec | ||
| .try_decode_file_format(&proto.options, ctx) | ||
| } | ||
| protobuf::FileFormatKind::Arrow => file_formats::ArrowLogicalExtensionCodec | ||
| .try_decode_file_format(&proto.options, ctx), | ||
| protobuf::FileFormatKind::Avro => file_formats::AvroLogicalExtensionCodec | ||
| .try_decode_file_format(&proto.options, ctx), | ||
| #[cfg(not(feature = "parquet"))] | ||
| protobuf::FileFormatKind::Parquet => { | ||
| not_impl_err!("Parquet support requires the 'parquet' feature") | ||
| } | ||
| protobuf::FileFormatKind::Unspecified => { | ||
| not_impl_err!("Unspecified file format kind") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn try_encode_file_format( | ||
| &self, | ||
| buf: &mut Vec<u8>, | ||
| node: Arc<dyn FileFormatFactory>, | ||
| ) -> Result<()> { | ||
| use datafusion_datasource_arrow::file_format::ArrowFormatFactory; | ||
| use datafusion_datasource_csv::file_format::CsvFormatFactory; | ||
| use datafusion_datasource_json::file_format::JsonFormatFactory; | ||
| use prost::Message; | ||
|
Acfboy marked this conversation as resolved.
Outdated
Acfboy marked this conversation as resolved.
Outdated
|
||
|
|
||
| let any = node.as_any(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to extract variable here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean we should inline
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, sorry for confusion
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx for clarification!
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to extract it as a variable here |
||
| let mut options = Vec::new(); | ||
|
Acfboy marked this conversation as resolved.
Outdated
Acfboy marked this conversation as resolved.
Outdated
|
||
|
|
||
| let kind = if any.downcast_ref::<CsvFormatFactory>().is_some() { | ||
| file_formats::CsvLogicalExtensionCodec | ||
| .try_encode_file_format(&mut options, Arc::clone(&node))?; | ||
| protobuf::FileFormatKind::Csv | ||
| } else if any.downcast_ref::<JsonFormatFactory>().is_some() { | ||
| file_formats::JsonLogicalExtensionCodec | ||
| .try_encode_file_format(&mut options, Arc::clone(&node))?; | ||
| protobuf::FileFormatKind::Json | ||
| } else if any.downcast_ref::<ArrowFormatFactory>().is_some() { | ||
| file_formats::ArrowLogicalExtensionCodec | ||
| .try_encode_file_format(&mut options, Arc::clone(&node))?; | ||
| protobuf::FileFormatKind::Arrow | ||
| } else { | ||
| #[cfg(feature = "parquet")] | ||
| { | ||
| use datafusion_datasource_parquet::file_format::ParquetFormatFactory; | ||
| if any.downcast_ref::<ParquetFormatFactory>().is_some() { | ||
| file_formats::ParquetLogicalExtensionCodec | ||
| .try_encode_file_format(&mut options, Arc::clone(&node))?; | ||
| protobuf::FileFormatKind::Parquet | ||
| } else { | ||
| return not_impl_err!( | ||
| "Unsupported FileFormatFactory type for DefaultLogicalExtensionCodec" | ||
| ); | ||
| } | ||
| } | ||
| #[cfg(not(feature = "parquet"))] | ||
| { | ||
| return not_impl_err!( | ||
| "Unsupported FileFormatFactory type for DefaultLogicalExtensionCodec" | ||
| ); | ||
| } | ||
| }; | ||
|
|
||
| let proto = protobuf::FileFormatProto { | ||
| kind: kind as i32, | ||
| options, | ||
| }; | ||
| proto.encode(buf).map_err(|e| { | ||
| internal_datafusion_err!("Failed to encode FileFormatProto: {e}") | ||
| })?; | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| #[macro_export] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we add some comments desribing what this message does