Skip to content

Commit 7364d53

Browse files
committed
address review
1 parent e2275e2 commit 7364d53

4 files changed

Lines changed: 45 additions & 38 deletions

File tree

datafusion/proto/proto/datafusion.proto

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,9 @@ message CopyToNode {
270270
repeated string partition_by = 7;
271271
}
272272

273+
// Identifies a built-in file format supported by DataFusion.
274+
// Used by DefaultLogicalExtensionCodec to serialize/deserialize
275+
// FileFormatFactory instances (e.g. in CopyTo plans).
273276
enum FileFormatKind {
274277
FILE_FORMAT_KIND_UNSPECIFIED = 0;
275278
FILE_FORMAT_KIND_CSV = 1;
@@ -279,9 +282,11 @@ enum FileFormatKind {
279282
FILE_FORMAT_KIND_AVRO = 5;
280283
}
281284

285+
// Wraps a serialized FileFormatFactory with its format kind tag,
286+
// so the decoder can dispatch to the correct format-specific codec.
282287
message FileFormatProto {
283288
FileFormatKind kind = 1;
284-
bytes options = 2;
289+
bytes encoded_file_format = 2;
285290
}
286291

287292
message DmlNode{

datafusion/proto/src/generated/pbjson.rs

Lines changed: 13 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,15 @@ use datafusion_datasource::file_format::FileFormat;
4444
use datafusion_datasource::file_format::{
4545
FileFormatFactory, file_type_to_format, format_as_file_type,
4646
};
47-
use datafusion_datasource_arrow::file_format::ArrowFormat;
47+
use datafusion_datasource_arrow::file_format::{ArrowFormat, ArrowFormatFactory};
4848
#[cfg(feature = "avro")]
4949
use datafusion_datasource_avro::file_format::AvroFormat;
50-
use datafusion_datasource_csv::file_format::CsvFormat;
51-
use datafusion_datasource_json::file_format::JsonFormat as OtherNdJsonFormat;
50+
use datafusion_datasource_csv::file_format::{CsvFormat, CsvFormatFactory};
51+
use datafusion_datasource_json::file_format::{
52+
JsonFormat as OtherNdJsonFormat, JsonFormatFactory,
53+
};
5254
#[cfg(feature = "parquet")]
53-
use datafusion_datasource_parquet::file_format::ParquetFormat;
55+
use datafusion_datasource_parquet::file_format::{ParquetFormat, ParquetFormatFactory};
5456
use datafusion_expr::{
5557
AggregateUDF, DmlStatement, FetchType, RecursiveQuery, SkipType, TableSource, Unnest,
5658
};
@@ -213,8 +215,6 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
213215
buf: &[u8],
214216
ctx: &TaskContext,
215217
) -> Result<Arc<dyn FileFormatFactory>> {
216-
use prost::Message;
217-
218218
let proto = protobuf::FileFormatProto::decode(buf).map_err(|e| {
219219
internal_datafusion_err!("Failed to decode FileFormatProto: {e}")
220220
})?;
@@ -225,18 +225,18 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
225225

226226
match kind {
227227
protobuf::FileFormatKind::Csv => file_formats::CsvLogicalExtensionCodec
228-
.try_decode_file_format(&proto.options, ctx),
228+
.try_decode_file_format(&proto.encoded_file_format, ctx),
229229
protobuf::FileFormatKind::Json => file_formats::JsonLogicalExtensionCodec
230-
.try_decode_file_format(&proto.options, ctx),
230+
.try_decode_file_format(&proto.encoded_file_format, ctx),
231231
#[cfg(feature = "parquet")]
232232
protobuf::FileFormatKind::Parquet => {
233233
file_formats::ParquetLogicalExtensionCodec
234-
.try_decode_file_format(&proto.options, ctx)
234+
.try_decode_file_format(&proto.encoded_file_format, ctx)
235235
}
236236
protobuf::FileFormatKind::Arrow => file_formats::ArrowLogicalExtensionCodec
237-
.try_decode_file_format(&proto.options, ctx),
237+
.try_decode_file_format(&proto.encoded_file_format, ctx),
238238
protobuf::FileFormatKind::Avro => file_formats::AvroLogicalExtensionCodec
239-
.try_decode_file_format(&proto.options, ctx),
239+
.try_decode_file_format(&proto.encoded_file_format, ctx),
240240
#[cfg(not(feature = "parquet"))]
241241
protobuf::FileFormatKind::Parquet => {
242242
not_impl_err!("Parquet support requires the 'parquet' feature")
@@ -252,33 +252,29 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
252252
buf: &mut Vec<u8>,
253253
node: Arc<dyn FileFormatFactory>,
254254
) -> Result<()> {
255-
use datafusion_datasource_arrow::file_format::ArrowFormatFactory;
256-
use datafusion_datasource_csv::file_format::CsvFormatFactory;
257-
use datafusion_datasource_json::file_format::JsonFormatFactory;
258-
use prost::Message;
255+
let mut encoded_file_format = Vec::new();
259256

260257
let any = node.as_any();
261-
let mut options = Vec::new();
262-
263258
let kind = if any.downcast_ref::<CsvFormatFactory>().is_some() {
264259
file_formats::CsvLogicalExtensionCodec
265-
.try_encode_file_format(&mut options, Arc::clone(&node))?;
260+
.try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
266261
protobuf::FileFormatKind::Csv
267262
} else if any.downcast_ref::<JsonFormatFactory>().is_some() {
268263
file_formats::JsonLogicalExtensionCodec
269-
.try_encode_file_format(&mut options, Arc::clone(&node))?;
264+
.try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
270265
protobuf::FileFormatKind::Json
271266
} else if any.downcast_ref::<ArrowFormatFactory>().is_some() {
272267
file_formats::ArrowLogicalExtensionCodec
273-
.try_encode_file_format(&mut options, Arc::clone(&node))?;
268+
.try_encode_file_format(&mut encoded_file_format, Arc::clone(&node))?;
274269
protobuf::FileFormatKind::Arrow
275270
} else {
276271
#[cfg(feature = "parquet")]
277272
{
278-
use datafusion_datasource_parquet::file_format::ParquetFormatFactory;
279273
if any.downcast_ref::<ParquetFormatFactory>().is_some() {
280-
file_formats::ParquetLogicalExtensionCodec
281-
.try_encode_file_format(&mut options, Arc::clone(&node))?;
274+
file_formats::ParquetLogicalExtensionCodec.try_encode_file_format(
275+
&mut encoded_file_format,
276+
Arc::clone(&node),
277+
)?;
282278
protobuf::FileFormatKind::Parquet
283279
} else {
284280
return not_impl_err!(
@@ -296,7 +292,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
296292

297293
let proto = protobuf::FileFormatProto {
298294
kind: kind as i32,
299-
options,
295+
encoded_file_format,
300296
};
301297
proto.encode(buf).map_err(|e| {
302298
internal_datafusion_err!("Failed to encode FileFormatProto: {e}")

0 commit comments

Comments
 (0)