From e939ea037728e1e01396cffdc8cbf2fe58541867 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Sun, 1 Mar 2026 15:04:25 +1100 Subject: [PATCH 01/11] Add "encoding" option to CsvOptions and protobuf schemas --- Cargo.lock | 10 ++++++++++ Cargo.toml | 1 + datafusion/common/src/config.rs | 1 + datafusion/datasource-csv/Cargo.toml | 1 + .../proto-common/proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 2 ++ datafusion/proto-common/src/generated/pbjson.rs | 17 +++++++++++++++++ datafusion/proto-common/src/generated/prost.rs | 3 +++ datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 +++ .../proto/src/logical_plan/file_formats.rs | 6 ++++++ 11 files changed, 46 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 38fa83dd12119..f0dfee52cabf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2017,6 +2017,7 @@ dependencies = [ "datafusion-physical-expr-common", "datafusion-physical-plan", "datafusion-session", + "encoding_rs", "futures", "object_store", "regex", @@ -2830,6 +2831,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" diff --git a/Cargo.toml b/Cargo.toml index d057261f7a2e1..c4825e7878510 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,6 +151,7 @@ datafusion-sql = { path = "datafusion/sql", version = "52.2.0" } datafusion-substrait = { path = "datafusion/substrait", version = "52.2.0" } doc-comment = "0.3" +encoding_rs = "0.8" env_logger = "0.11" flate2 = "1.1.9" futures = "0.3" diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d71af206c78d5..d55fa8ff2daf5 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2921,6 +2921,7 @@ config_namespace! { /// /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub newlines_in_values: Option, default = None + pub encoding: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED /// Compression level for the output file. The valid range depends on the /// compression algorithm: diff --git a/datafusion/datasource-csv/Cargo.toml b/datafusion/datasource-csv/Cargo.toml index 295092512742b..0f83fa8ac79fc 100644 --- a/datafusion/datasource-csv/Cargo.toml +++ b/datafusion/datasource-csv/Cargo.toml @@ -42,6 +42,7 @@ datafusion-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } +encoding_rs = { workspace = true, optional = true } futures = { workspace = true } object_store = { workspace = true } regex = { workspace = true } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 62c6bbe85612a..88c7f6430a799 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -476,6 +476,7 @@ message CsvOptions { bytes terminator = 17; // Optional terminator character as a byte bytes truncated_rows = 18; // Indicates if truncated rows are allowed optional uint32 compression_level = 19; // Optional compression level + string encoding = 20; // Optional character encoding } // Options controlling CSV format diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index ca8a269958d73..c7100f5e3210a 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -984,6 +984,8 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { escape: proto_opts.escape.first().copied(), double_quote: proto_opts.double_quote.first().map(|h| *h != 0), newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), + encoding: (!proto_opts.encoding.is_empty()) + .then(|| proto_opts.encoding.clone()), compression: proto_opts.compression().into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index b00e7546bba20..11d9f8db4e09f 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1701,6 +1701,9 @@ impl serde::Serialize for CsvOptions { if self.compression_level.is_some() { len += 1; } + if !self.encoding.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1781,6 +1784,9 @@ impl serde::Serialize for CsvOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } + if !self.encoding.is_empty() { + struct_ser.serialize_field("encoding", &self.encoding)?; + } struct_ser.end() } } @@ -1823,6 +1829,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "truncatedRows", "compression_level", "compressionLevel", + "encoding", ]; #[allow(clippy::enum_variant_names)] @@ -1846,6 +1853,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Terminator, TruncatedRows, CompressionLevel, + Encoding, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1886,6 +1894,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "terminator" => Ok(GeneratedField::Terminator), "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "encoding" => Ok(GeneratedField::Encoding), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1924,6 +1933,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut terminator__ = None; let mut truncated_rows__ = None; let mut compression_level__ = None; + let mut encoding__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2062,6 +2072,12 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::Encoding => { + if encoding__.is_some() { + return Err(serde::de::Error::duplicate_field("encoding")); + } + encoding__ = Some(map_.next_value()?); + } } } Ok(CsvOptions { @@ -2084,6 +2100,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { terminator: terminator__.unwrap_or_default(), truncated_rows: truncated_rows__.unwrap_or_default(), compression_level: compression_level__, + encoding: encoding__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index a09826a29be52..f1b310357c6d9 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -672,6 +672,9 @@ pub struct CsvOptions { /// Optional compression level #[prost(uint32, optional, tag = "19")] pub compression_level: ::core::option::Option, + /// Optional character encoding + #[prost(string, tag = "20")] + pub encoding: ::prost::alloc::string::String, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 79e3306a4df1b..014875281cefd 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -986,6 +986,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { newlines_in_values: opts .newlines_in_values .map_or_else(Vec::new, |h| vec![h as u8]), + encoding: opts.encoding.clone().unwrap_or_default(), compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), date_format: opts.date_format.clone().unwrap_or_default(), diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index a09826a29be52..f1b310357c6d9 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -672,6 +672,9 @@ pub struct CsvOptions { /// Optional compression level #[prost(uint32, optional, tag = "19")] pub compression_level: ::core::option::Option, + /// Optional character encoding + #[prost(string, tag = "20")] + pub encoding: ::prost::alloc::string::String, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 08f42b0af7290..bf0b472a4096c 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -45,6 +45,7 @@ impl CsvOptionsProto { terminator: options.terminator.map_or(vec![], |v| vec![v]), escape: options.escape.map_or(vec![], |v| vec![v]), double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]), + encoding: options.encoding.clone().unwrap_or_default(), compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), date_format: options.date_format.clone().unwrap_or_default(), @@ -95,6 +96,11 @@ impl From<&CsvOptionsProto> for CsvOptions { } else { None }, + encoding: if !proto.encoding.is_empty() { + Some(proto.encoding.clone()) + } else { + None + }, compression: match proto.compression { 0 => CompressionTypeVariant::GZIP, 1 => CompressionTypeVariant::BZIP2, From c2d4a20e553705bcfb1c2c723d1838c693d8832e Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Sun, 1 Mar 2026 13:27:52 +1100 Subject: [PATCH 02/11] Implement charset decoding in the CSV reader, and refactor surrounding code slightly --- datafusion/datasource-csv/src/encoding.rs | 142 ++++++++++++++++++++++ datafusion/datasource-csv/src/mod.rs | 2 + datafusion/datasource-csv/src/source.rs | 86 +++++++++---- datafusion/datasource/src/decoder.rs | 122 +++++++++++-------- 4 files changed, 280 insertions(+), 72 deletions(-) create mode 100644 datafusion/datasource-csv/src/encoding.rs diff --git a/datafusion/datasource-csv/src/encoding.rs b/datafusion/datasource-csv/src/encoding.rs new file mode 100644 index 0000000000000..d948e6419f062 --- /dev/null +++ b/datafusion/datasource-csv/src/encoding.rs @@ -0,0 +1,142 @@ +use std::fmt::Debug; + +use arrow::array::RecordBatch; +use arrow::error::ArrowError; +use datafusion_common::Result; +use datafusion_datasource::decoder::Decoder; +use encoding_rs::{CoderResult, Encoding}; + +use self::buffer::Buffer; + +/// Default capacity of the buffer used to decode non-UTF-8 charset streams +static DECODE_BUFFER_CAP: usize = 8 * 1024; + +/// A `Decoder` that decodes input bytes from the specified character encoding +/// to UTF-8 before passing them onto the inner `Decoder`. +pub struct CharsetDecoder { + inner: T, + charset_decoder: encoding_rs::Decoder, + buffer: Buffer, +} + +impl CharsetDecoder { + pub fn new(inner: T, encoding: &'static Encoding) -> Self { + Self { + inner, + charset_decoder: encoding.new_decoder(), + buffer: Buffer::with_capacity(DECODE_BUFFER_CAP), + } + } +} + +impl Decoder for CharsetDecoder { + fn decode(&mut self, buf: &[u8]) -> Result { + let last = buf.is_empty(); + let mut buf_offset = 0; + + if !self.buffer.is_empty() { + let decoded = self.inner.decode(self.buffer.read_buf())?; + self.buffer.consume(decoded); + + if decoded == 0 { + return Ok(buf_offset); + } + } + + loop { + self.buffer.backshift(); + + let (res, read, written, _) = self.charset_decoder.decode_to_utf8( + &buf[buf_offset..], + self.buffer.write_buf(), + last, + ); + buf_offset += read; + self.buffer.advance(written); + + let decoded = self.inner.decode(self.buffer.read_buf())?; + self.buffer.consume(decoded); + + if res == CoderResult::InputEmpty || decoded == 0 { + break; + } + } + + Ok(buf_offset) + } + + fn flush(&mut self) -> Result, ArrowError> { + self.inner.flush() + } + + fn can_flush_early(&self) -> bool { + self.inner.can_flush_early() + } +} + +impl Debug for CharsetDecoder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CharsetDecoder") + .field("inner", &self.inner) + .field("charset_decoder", self.charset_decoder.encoding()) + .finish() + } +} + +mod buffer { + /// A fixed-sized buffer that maintains both + /// a read position and a write position + #[derive(Debug)] + pub struct Buffer { + buf: Box<[u8]>, + read_ptr: usize, + write_ptr: usize, + } + + impl Buffer { + /// Creates a new `Buffer` with the specified capacity + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { + buf: vec![0; capacity].into_boxed_slice(), + read_ptr: 0, + write_ptr: 0, + } + } + + /// Whether there are no more bytes available to be read + pub fn is_empty(&self) -> bool { + self.read_ptr == self.write_ptr + } + + /// Returns the unread portion of the buffer + pub fn read_buf(&self) -> &[u8] { + &self.buf[self.read_ptr..self.write_ptr] + } + + /// Advances the read position by `amount` bytes + pub fn consume(&mut self, amount: usize) { + self.read_ptr += amount; + debug_assert!(self.read_ptr <= self.write_ptr); + } + + /// Returns the portion of the buffer available for writing + pub fn write_buf(&mut self) -> &mut [u8] { + &mut self.buf[self.write_ptr..] + } + + /// Advances the write position by `amount` bytes + pub fn advance(&mut self, amount: usize) { + self.write_ptr += amount; + debug_assert!(self.write_ptr <= self.buf.len()) + } + + /// Moves any unread bytes to the start of the buffer, + /// creating more space for writing new data + pub fn backshift(&mut self) { + self.buf.copy_within(self.read_ptr..self.write_ptr, 0); + self.write_ptr -= self.read_ptr; + self.read_ptr = 0; + } + } +} diff --git a/datafusion/datasource-csv/src/mod.rs b/datafusion/datasource-csv/src/mod.rs index fdfee05d86a79..8b91658421e52 100644 --- a/datafusion/datasource-csv/src/mod.rs +++ b/datafusion/datasource-csv/src/mod.rs @@ -20,6 +20,8 @@ // https://github.com/apache/datafusion/issues/11143 #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] +#[cfg(feature = "encoding_rs")] +mod encoding; pub mod file_format; pub mod source; diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 77a0dc9cf7995..80ee2ff56a77c 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -17,11 +17,12 @@ //! Execution plan for reading CSV files +use datafusion_datasource::decoder::deserialize_reader; use datafusion_datasource::projection::{ProjectionOpener, SplitProjection}; use datafusion_physical_plan::projection::ProjectionExprs; use std::any::Any; use std::fmt; -use std::io::{Read, Seek, SeekFrom}; +use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use std::task::Poll; @@ -46,6 +47,8 @@ use datafusion_physical_plan::{ DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, }; +#[cfg(feature = "encoding_rs")] +use crate::encoding::CharsetDecoder; use crate::file_format::CsvDecoder; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; @@ -234,6 +237,30 @@ impl CsvOpener { partition_index: 0, } } + + #[cfg(feature = "encoding_rs")] + fn encoding(&self) -> Result> { + match self.config.options.encoding.as_ref() { + Some(enc) => match encoding_rs::Encoding::for_label(enc.as_bytes()) { + Some(enc) => Ok(Some(enc)), + None => Err(DataFusionError::Configuration(format!( + "Unknown character set '{enc}'" + )))?, + }, + None => Ok(None), + } + } + + #[cfg(not(feature = "encoding_rs"))] + fn encoding(&self) -> Result> { + match &self.config.options.encoding { + Some(_) => Err(DataFusionError::NotImplemented( + "The 'encoding_rs' feature must be enabled to decode non-UTF-8 encodings" + .to_owned(), + ))?, + None => Ok(None), + } + } } impl From for Arc { @@ -371,6 +398,7 @@ impl FileOpener for CsvOpener { config.options.truncated_rows = Some(config.truncate_rows()); let file_compression_type = self.file_compression_type.to_owned(); + let encoding = self.encoding()?; if partitioned_file.range.is_some() { assert!( @@ -410,43 +438,59 @@ impl FileOpener for CsvOpener { .get_opts(&partitioned_file.object_meta.location, options) .await?; + let decoder = config.builder().build_decoder(); + let decoder = CsvDecoder::new(decoder); + match result.payload { #[cfg(not(target_arch = "wasm32"))] GetResultPayload::File(mut file, _) => { let is_whole_file_scanned = partitioned_file.range.is_none(); - let decoder = if is_whole_file_scanned { - // Don't seek if no range as breaks FIFO files + let reader = if is_whole_file_scanned { + // Don't seek if no range as that would break FIFO files file_compression_type.convert_read(file)? } else { - file.seek(SeekFrom::Start(result.range.start as _))?; - file_compression_type.convert_read( - file.take((result.range.end - result.range.start) as u64), - )? + let bytes = (result.range.end - result.range.start) as u64; + file.seek(SeekFrom::Start(result.range.start as u64))?; + file_compression_type.convert_read(file.take(bytes))? }; - let mut reader = config.open(decoder)?; + let reader = BufReader::new(reader); + + let mut reader = match encoding { + #[cfg(feature = "encoding_rs")] + Some(enc) => { + let decoder = CharsetDecoder::new(decoder, enc); + deserialize_reader(reader, decoder) + } + None => deserialize_reader(reader, decoder), + }; // Use std::iter::from_fn to wrap execution of iterator's next() method. let iterator = std::iter::from_fn(move || { let mut timer = baseline_metrics.elapsed_compute().timer(); let result = reader.next(); timer.stop(); - result + result.map(|r| r.map_err(Into::into)) }); - Ok(futures::stream::iter(iterator) - .map(|r| r.map_err(Into::into)) - .boxed()) + Ok(futures::stream::iter(iterator).boxed()) } - GetResultPayload::Stream(s) => { - let decoder = config.builder().build_decoder(); - let s = s.map_err(DataFusionError::from); - let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - - let stream = deserialize_stream( - input, - DecoderDeserializer::new(CsvDecoder::new(decoder)), - ); + GetResultPayload::Stream(stream) => { + let stream = stream.map_err(DataFusionError::from).boxed(); + + let stream = file_compression_type.convert_stream(stream)?.fuse(); + + let stream = match encoding { + #[cfg(feature = "encoding_rs")] + Some(enc) => { + let decoder = CharsetDecoder::new(decoder, enc); + deserialize_stream(stream, DecoderDeserializer::new(decoder)) + } + None => { + deserialize_stream(stream, DecoderDeserializer::new(decoder)) + } + }; + Ok(stream.map_err(Into::into).boxed()) } } diff --git a/datafusion/datasource/src/decoder.rs b/datafusion/datasource/src/decoder.rs index 9f9fc0d94bb1c..7cfe25654cee8 100644 --- a/datafusion/datasource/src/decoder.rs +++ b/datafusion/datasource/src/decoder.rs @@ -18,8 +18,7 @@ //! Module containing helper methods for the various file formats //! See write.rs for write related helper methods -use ::arrow::array::RecordBatch; - +use arrow::array::RecordBatch; use arrow::error::ArrowError; use bytes::Buf; use bytes::Bytes; @@ -29,19 +28,9 @@ use futures::stream::BoxStream; use futures::{Stream, ready}; use std::collections::VecDeque; use std::fmt; +use std::io::BufRead; use std::task::Poll; -/// Possible outputs of a [`BatchDeserializer`]. -#[derive(Debug, PartialEq)] -pub enum DeserializerOutput { - /// A successfully deserialized [`RecordBatch`]. - RecordBatch(RecordBatch), - /// The deserializer requires more data to make progress. - RequiresMoreData, - /// The input data has been exhausted. - InputExhausted, -} - /// Trait defining a scheme for deserializing byte streams into structured data. /// Implementors of this trait are responsible for converting raw bytes into /// `RecordBatch` objects. @@ -49,8 +38,8 @@ pub trait BatchDeserializer: Send + fmt::Debug { /// Feeds a message for deserialization, updating the internal state of /// this `BatchDeserializer`. Note that one can call this function multiple /// times before calling `next`, which will queue multiple messages for - /// deserialization. Returns the number of bytes consumed. - fn digest(&mut self, message: T) -> usize; + /// deserialization. + fn digest(&mut self, message: T); /// Attempts to deserialize any pending messages and returns a /// `DeserializerOutput` to indicate progress. @@ -61,6 +50,17 @@ pub trait BatchDeserializer: Send + fmt::Debug { fn finish(&mut self); } +/// Possible outputs of a [`BatchDeserializer`]. +#[derive(Debug, PartialEq)] +pub enum DeserializerOutput { + /// A successfully deserialized [`RecordBatch`]. + RecordBatch(RecordBatch), + /// The deserializer requires more data to make progress. + RequiresMoreData, + /// The input data has been exhausted. + InputExhausted, +} + /// A general interface for decoders such as [`arrow::json::reader::Decoder`] and /// [`arrow::csv::reader::Decoder`]. Defines an interface similar to /// [`Decoder::decode`] and [`Decoder::flush`] methods, but also includes @@ -86,24 +86,37 @@ pub trait Decoder: Send + fmt::Debug { fn can_flush_early(&self) -> bool; } -impl fmt::Debug for DecoderDeserializer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Deserializer") - .field("buffered_queue", &self.buffered_queue) - .field("finalized", &self.finalized) - .finish() +/// A generic, decoder-based deserialization scheme for processing encoded data. +/// +/// This struct is responsible for converting a stream of bytes, which represent +/// encoded data, into a stream of `RecordBatch` objects, following the specified +/// schema and formatting options. It also handles any buffering necessary to satisfy +/// the `Decoder` interface. +pub struct DecoderDeserializer { + /// The underlying decoder used for deserialization + pub(crate) decoder: T, + /// The buffer used to store the remaining bytes to be decoded + pub(crate) buffered_queue: VecDeque, + /// Whether the input stream has been fully consumed + pub(crate) finalized: bool, +} + +impl DecoderDeserializer { + /// Creates a new `DecoderDeserializer` with the provided decoder. + pub fn new(decoder: T) -> Self { + DecoderDeserializer { + decoder, + buffered_queue: VecDeque::new(), + finalized: false, + } } } impl BatchDeserializer for DecoderDeserializer { - fn digest(&mut self, message: Bytes) -> usize { - if message.is_empty() { - return 0; + fn digest(&mut self, message: Bytes) { + if !message.is_empty() { + self.buffered_queue.push_back(message); } - - let consumed = message.len(); - self.buffered_queue.push_back(message); - consumed } fn next(&mut self) -> Result { @@ -139,29 +152,12 @@ impl BatchDeserializer for DecoderDeserializer { } } -/// A generic, decoder-based deserialization scheme for processing encoded data. -/// -/// This struct is responsible for converting a stream of bytes, which represent -/// encoded data, into a stream of `RecordBatch` objects, following the specified -/// schema and formatting options. It also handles any buffering necessary to satisfy -/// the `Decoder` interface. -pub struct DecoderDeserializer { - /// The underlying decoder used for deserialization - pub(crate) decoder: T, - /// The buffer used to store the remaining bytes to be decoded - pub(crate) buffered_queue: VecDeque, - /// Whether the input stream has been fully consumed - pub(crate) finalized: bool, -} - -impl DecoderDeserializer { - /// Creates a new `DecoderDeserializer` with the provided decoder. - pub fn new(decoder: T) -> Self { - DecoderDeserializer { - decoder, - buffered_queue: VecDeque::new(), - finalized: false, - } +impl fmt::Debug for DecoderDeserializer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Deserializer") + .field("buffered_queue", &self.buffered_queue) + .field("finalized", &self.finalized) + .finish() } } @@ -191,3 +187,27 @@ pub fn deserialize_stream<'a>( }) .boxed() } + +/// Creates an iterator of [`RecordBatch`]es that consumes bytes from an inner [`BufRead`] +/// and deserializes them using the provided decoder. +pub fn deserialize_reader<'a>( + mut reader: impl BufRead + Send + 'a, + mut decoder: impl Decoder + Send + 'a, +) -> Box> + Send + 'a> { + let mut read = move || { + loop { + let buf = reader.fill_buf()?; + + let decoded = decoder.decode(buf)?; + reader.consume(decoded); + + if decoded == 0 || decoder.can_flush_early() { + break; + } + } + + decoder.flush() + }; + + Box::new(std::iter::from_fn(move || read().transpose())) +} From 29868a3336afb96c1af1a01c8b4070d2abb70fa4 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Sun, 1 Mar 2026 14:29:04 +1100 Subject: [PATCH 03/11] Fix clippy issues --- datafusion/datasource-csv/src/source.rs | 4 ---- datafusion/datasource/src/decoder.rs | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 80ee2ff56a77c..3bc05e5be7d05 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -185,10 +185,6 @@ impl CsvSource { } impl CsvSource { - fn open(&self, reader: R) -> Result> { - Ok(self.builder().build(reader)?) - } - fn builder(&self) -> csv::ReaderBuilder { let mut builder = csv::ReaderBuilder::new(Arc::clone(self.table_schema.file_schema())) diff --git a/datafusion/datasource/src/decoder.rs b/datafusion/datasource/src/decoder.rs index 7cfe25654cee8..ef759f0fb7e2f 100644 --- a/datafusion/datasource/src/decoder.rs +++ b/datafusion/datasource/src/decoder.rs @@ -174,7 +174,7 @@ pub fn deserialize_stream<'a>( futures::stream::poll_fn(move |cx| { loop { match ready!(input.poll_next_unpin(cx)).transpose()? { - Some(b) => _ = deserializer.digest(b), + Some(b) => deserializer.digest(b), None => deserializer.finish(), }; @@ -192,7 +192,7 @@ pub fn deserialize_stream<'a>( /// and deserializes them using the provided decoder. pub fn deserialize_reader<'a>( mut reader: impl BufRead + Send + 'a, - mut decoder: impl Decoder + Send + 'a, + mut decoder: impl Decoder + 'a, ) -> Box> + Send + 'a> { let mut read = move || { loop { From fe5c1e480b8f8db16482f857074d099bf3e8d8d7 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Sun, 1 Mar 2026 15:57:02 +1100 Subject: [PATCH 04/11] Add SHIFT-JIS test --- Cargo.lock | 1 + datafusion/core/Cargo.toml | 2 + .../core/src/datasource/file_format/csv.rs | 45 +++++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f0dfee52cabf2..15a3b64fb3a13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1756,6 +1756,7 @@ dependencies = [ "datafusion-session", "datafusion-sql", "doc-comment", + "encoding_rs", "env_logger", "flate2", "futures", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 8965948a0f4e2..0e55d5b2bce8a 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -68,6 +68,7 @@ default = [ "recursive_protection", "sql", ] +encoding_rs = ["datafusion-datasource-csv/encoding_rs"] encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = ["datafusion-physical-plan/force_hash_collisions", "datafusion-common/force_hash_collisions"] @@ -171,6 +172,7 @@ datafusion-functions-window-common = { workspace = true } datafusion-macros = { workspace = true } datafusion-physical-optimizer = { workspace = true } doc-comment = { workspace = true } +encoding_rs = { workspace = true } env_logger = { workspace = true } glob = { workspace = true } insta = { workspace = true } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 51d799a5b65c1..47565f537ca32 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1619,4 +1619,49 @@ mod tests { Ok(()) } + + #[cfg(feature = "encoding_rs")] + #[tokio::test] + async fn test_read_shift_jis_csv() -> Result<()> { + use std::io::Write; + + // Encode a test CSV into SHIFT-JIS + let data = r#"ID,Name,Price,Description,Notes +001,山本 大輔,\2945,桜餅と抹茶のセット,数量限定 +002,加藤 由美,\9575,和牛ステーキセット,取り寄せ中 +003,田中 太郎,\1853,抹茶アイスクリーム,ポイント2倍 +004,渡辺 さくら,\9494,和牛ステーキセット,送料無料 +005,加藤 由美,\558,和牛ステーキセット,新商品 +006,渡辺 さくら,\7704,天ぷら盛り合わせ,割引対象外 +007,田中 太郎,\212,桜餅と抹茶のセット,取り寄せ中 +008,中村 陽子,\8847,和牛ステーキセット,期間限定 +009,伊藤 健太,\5997,季節の野菜カレー,お一人様1点限り +010,高橋 美咲,\6594,季節の野菜カレー,冷凍保存"#; + let (data, _, _) = encoding_rs::SHIFT_JIS.encode(data); + + // Write the CSV data to a temp file + let mut tmp = tempfile::Builder::new().suffix(".csv").tempfile()?; + tmp.write_all(&*data)?; + let path = tmp.path().to_str().unwrap().to_string(); + + // Read the file + let ctx = SessionContext::new(); + let opts = CsvReadOptions::new().has_header(true); + let batches = ctx.read_csv(path, opts).await?.collect().await?; + + // Check + let num_rows = batches.iter().map(|b| b.num_rows()).sum::(); + assert_eq!(num_rows, 10); + + let names = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(names.value(0), "山本 大輔"); + assert_eq!(names.value(1), "加藤 由美"); + assert_eq!(names.value(2), "田中 太郎"); + + Ok(()) + } } From 2720b7e89e204e8d9ec2d187cf8111a2b696cf15 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Sun, 1 Mar 2026 16:32:49 +1100 Subject: [PATCH 05/11] Add license header --- datafusion/datasource-csv/src/encoding.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/datasource-csv/src/encoding.rs b/datafusion/datasource-csv/src/encoding.rs index d948e6419f062..caf22a07451e7 100644 --- a/datafusion/datasource-csv/src/encoding.rs +++ b/datafusion/datasource-csv/src/encoding.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::fmt::Debug; use arrow::array::RecordBatch; From 12a841441b7389ecbb57855211556d8eb3465118 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Mon, 2 Mar 2026 23:39:58 +1100 Subject: [PATCH 06/11] Rename "encoding" to "charset" for clarity, add in missing functions, make schema inference charset-aware --- datafusion/common/src/config.rs | 9 ++- .../core/src/datasource/file_format/csv.rs | 2 +- .../src/datasource/file_format/options.rs | 10 +++ .../src/{encoding.rs => charset.rs} | 65 ++++++++++++++++++- datafusion/datasource-csv/src/file_format.rs | 21 +++++- datafusion/datasource-csv/src/mod.rs | 17 ++++- datafusion/datasource-csv/src/source.rs | 35 ++-------- .../proto/datafusion_common.proto | 2 +- datafusion/proto-common/src/from_proto/mod.rs | 2 +- datafusion/proto-common/src/to_proto/mod.rs | 2 +- .../src/generated/datafusion_proto_common.rs | 2 +- .../proto/src/logical_plan/file_formats.rs | 6 +- 12 files changed, 130 insertions(+), 43 deletions(-) rename datafusion/datasource-csv/src/{encoding.rs => charset.rs} (72%) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index d55fa8ff2daf5..c045d609f1934 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -2921,7 +2921,7 @@ config_namespace! { /// /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting. pub newlines_in_values: Option, default = None - pub encoding: Option, default = None + pub charset: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED /// Compression level for the output file. The valid range depends on the /// compression algorithm: @@ -3034,6 +3034,13 @@ impl CsvOptions { self } + /// Specifies the character encoding the file is encoded with. + /// - defaults to UTF-8 + pub fn with_charset(mut self, charset: impl Into) -> Self { + self.charset = Some(charset.into()); + self + } + /// Set a `CompressionTypeVariant` of CSV /// - defaults to `CompressionTypeVariant::UNCOMPRESSED` pub fn with_file_compression_type( diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 47565f537ca32..d22aeec0f154c 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1646,7 +1646,7 @@ mod tests { // Read the file let ctx = SessionContext::new(); - let opts = CsvReadOptions::new().has_header(true); + let opts = CsvReadOptions::new().has_header(true).charset("SHIFT-JIS"); let batches = ctx.read_csv(path, opts).await?.collect().await?; // Check diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index bd0ac36087381..68e8596e12141 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -85,6 +85,8 @@ pub struct CsvReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec<(String, DataType)>, + /// Character encoding + pub charset: Option<&'a str>, /// File compression type pub file_compression_type: FileCompressionType, /// Indicates how the file is sorted @@ -118,6 +120,7 @@ impl<'a> CsvReadOptions<'a> { newlines_in_values: false, file_extension: DEFAULT_CSV_EXTENSION, table_partition_cols: vec![], + charset: None, file_compression_type: FileCompressionType::UNCOMPRESSED, file_sort_order: vec![], comment: None, @@ -209,6 +212,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Configure the character set encoding + pub fn charset(mut self, charset: &'a str) -> Self { + self.charset = Some(charset); + self + } + /// Configure file compression type pub fn file_compression_type( mut self, @@ -633,6 +642,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { .with_terminator(self.terminator) .with_newlines_in_values(self.newlines_in_values) .with_schema_infer_max_rec(self.schema_infer_max_records) + .with_charset(self.charset.map(ToOwned::to_owned)) .with_file_compression_type(self.file_compression_type.to_owned()) .with_null_regex(self.null_regex.clone()) .with_truncated_rows(self.truncated_rows); diff --git a/datafusion/datasource-csv/src/encoding.rs b/datafusion/datasource-csv/src/charset.rs similarity index 72% rename from datafusion/datasource-csv/src/encoding.rs rename to datafusion/datasource-csv/src/charset.rs index caf22a07451e7..2d0c3dbd3638b 100644 --- a/datafusion/datasource-csv/src/encoding.rs +++ b/datafusion/datasource-csv/src/charset.rs @@ -16,18 +16,31 @@ // under the License. use std::fmt::Debug; +use std::io::{BufRead, Read}; use arrow::array::RecordBatch; use arrow::error::ArrowError; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_datasource::decoder::Decoder; -use encoding_rs::{CoderResult, Encoding}; +use encoding_rs::{CoderResult, Encoding, UTF_8}; use self::buffer::Buffer; /// Default capacity of the buffer used to decode non-UTF-8 charset streams static DECODE_BUFFER_CAP: usize = 8 * 1024; +pub fn lookup_charset(enc: Option<&str>) -> Result> { + match enc { + Some(enc) => match Encoding::for_label(enc.as_bytes()) { + Some(enc) => Ok(Some(enc).filter(|enc| *enc != UTF_8)), + None => Err(DataFusionError::Configuration(format!( + "Unknown character set '{enc}'" + )))?, + }, + None => Ok(None), + } +} + /// A `Decoder` that decodes input bytes from the specified character encoding /// to UTF-8 before passing them onto the inner `Decoder`. pub struct CharsetDecoder { @@ -100,6 +113,54 @@ impl Debug for CharsetDecoder { } } +pub struct CharsetReader { + inner: R, + charset_decoder: encoding_rs::Decoder, + buffer: Buffer, +} + +impl CharsetReader { + pub fn new(inner: R, encoding: &'static Encoding) -> Self { + Self { + inner, + charset_decoder: encoding.new_decoder(), + buffer: Buffer::with_capacity(DECODE_BUFFER_CAP), + } + } +} + +impl Read for CharsetReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let src = self.fill_buf()?; + let len = src.len().min(buf.len()); + buf[..len].copy_from_slice(&src[..len]); + Ok(len) + } +} + +impl BufRead for CharsetReader { + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + if self.buffer.is_empty() { + self.buffer.backshift(); + + let buf = self.inner.fill_buf()?; + let (_, read, written, _) = self.charset_decoder.decode_to_utf8( + buf, + self.buffer.write_buf(), + buf.is_empty(), + ); + self.inner.consume(read); + self.buffer.advance(written); + } + + Ok(self.buffer.read_buf()) + } + + fn consume(&mut self, amount: usize) { + self.buffer.consume(amount); + } +} + mod buffer { /// A fixed-sized buffer that maintains both /// a read position and a write position diff --git a/datafusion/datasource-csv/src/file_format.rs b/datafusion/datasource-csv/src/file_format.rs index 7a253d81db9f8..ea9a5e92f02a9 100644 --- a/datafusion/datasource-csv/src/file_format.rs +++ b/datafusion/datasource-csv/src/file_format.rs @@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; +use crate::charset::lookup_charset; use crate::source::CsvSource; use arrow::array::RecordBatch; @@ -294,6 +295,13 @@ impl CsvFormat { self } + /// Sets the character encoding of the CSV. + /// Defaults to UTF-8 if unspecified. + pub fn with_charset(mut self, charset: Option) -> Self { + self.options.charset = charset; + self + } + /// Set a `FileCompressionType` of CSV /// - defaults to `FileCompressionType::UNCOMPRESSED` pub fn with_file_compression_type( @@ -540,6 +548,8 @@ impl CsvFormat { pin_mut!(stream); + let charset = lookup_charset(self.options.charset.as_deref())?; + while let Some(chunk) = stream.next().await.transpose()? { record_number += 1; let first_chunk = record_number == 0; @@ -569,8 +579,15 @@ impl CsvFormat { format = format.with_comment(comment); } - let (Schema { fields, .. }, records_read) = - format.infer_schema(chunk.reader(), Some(records_to_read))?; + let (Schema { fields, .. }, records_read) = match charset { + #[cfg(feature = "encoding_rs")] + Some(enc) => { + use crate::charset::CharsetReader; + let reader = CharsetReader::new(chunk.reader(), enc); + format.infer_schema(reader, Some(records_to_read))? + } + None => format.infer_schema(chunk.reader(), Some(records_to_read))?, + }; records_to_read -= records_read; total_records_read += records_read; diff --git a/datafusion/datasource-csv/src/mod.rs b/datafusion/datasource-csv/src/mod.rs index 8b91658421e52..34759e875aaf2 100644 --- a/datafusion/datasource-csv/src/mod.rs +++ b/datafusion/datasource-csv/src/mod.rs @@ -21,7 +21,7 @@ #![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))] #[cfg(feature = "encoding_rs")] -mod encoding; +mod charset; pub mod file_format; pub mod source; @@ -32,6 +32,7 @@ use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::{file::FileSource, file_scan_config::FileScanConfig}; use datafusion_execution::object_store::ObjectStoreUrl; + pub use file_format::*; /// Returns a [`FileScanConfig`] for given `file_groups` @@ -45,3 +46,17 @@ pub fn partitioned_csv_config( .build(), ) } + +#[cfg(not(feature = "encoding_rs"))] +mod encoding { + use datafusion_common::{DataFusionError, Result}; + + pub fn find_encoding(enc: Option<&str>) -> Result> { + match enc { + Some(_) => Err(DataFusionError::NotImplemented(format!( + "The 'encoding_rs' feature must be enabled to decode non-UTF-8 encodings" + )))?, + None => Ok(None), + } + } +} diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 3bc05e5be7d05..775d45ee9cf79 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -47,8 +47,7 @@ use datafusion_physical_plan::{ DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, }; -#[cfg(feature = "encoding_rs")] -use crate::encoding::CharsetDecoder; +use crate::charset::lookup_charset; use crate::file_format::CsvDecoder; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; @@ -233,30 +232,6 @@ impl CsvOpener { partition_index: 0, } } - - #[cfg(feature = "encoding_rs")] - fn encoding(&self) -> Result> { - match self.config.options.encoding.as_ref() { - Some(enc) => match encoding_rs::Encoding::for_label(enc.as_bytes()) { - Some(enc) => Ok(Some(enc)), - None => Err(DataFusionError::Configuration(format!( - "Unknown character set '{enc}'" - )))?, - }, - None => Ok(None), - } - } - - #[cfg(not(feature = "encoding_rs"))] - fn encoding(&self) -> Result> { - match &self.config.options.encoding { - Some(_) => Err(DataFusionError::NotImplemented( - "The 'encoding_rs' feature must be enabled to decode non-UTF-8 encodings" - .to_owned(), - ))?, - None => Ok(None), - } - } } impl From for Arc { @@ -394,7 +369,7 @@ impl FileOpener for CsvOpener { config.options.truncated_rows = Some(config.truncate_rows()); let file_compression_type = self.file_compression_type.to_owned(); - let encoding = self.encoding()?; + let charset = lookup_charset(self.config.options.charset.as_deref())?; if partitioned_file.range.is_some() { assert!( @@ -452,9 +427,10 @@ impl FileOpener for CsvOpener { let reader = BufReader::new(reader); - let mut reader = match encoding { + let mut reader = match charset { #[cfg(feature = "encoding_rs")] Some(enc) => { + use crate::charset::CharsetDecoder; let decoder = CharsetDecoder::new(decoder, enc); deserialize_reader(reader, decoder) } @@ -476,9 +452,10 @@ impl FileOpener for CsvOpener { let stream = file_compression_type.convert_stream(stream)?.fuse(); - let stream = match encoding { + let stream = match charset { #[cfg(feature = "encoding_rs")] Some(enc) => { + use crate::charset::CharsetDecoder; let decoder = CharsetDecoder::new(decoder, enc); deserialize_stream(stream, DecoderDeserializer::new(decoder)) } diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 88c7f6430a799..8600baeb5a339 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -476,7 +476,7 @@ message CsvOptions { bytes terminator = 17; // Optional terminator character as a byte bytes truncated_rows = 18; // Indicates if truncated rows are allowed optional uint32 compression_level = 19; // Optional compression level - string encoding = 20; // Optional character encoding + string charset = 20; // Optional character encoding } // Options controlling CSV format diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index c7100f5e3210a..2a756da8d3873 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -984,7 +984,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { escape: proto_opts.escape.first().copied(), double_quote: proto_opts.double_quote.first().map(|h| *h != 0), newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), - encoding: (!proto_opts.encoding.is_empty()) + charset: (!proto_opts.encoding.is_empty()) .then(|| proto_opts.encoding.clone()), compression: proto_opts.compression().into(), compression_level: proto_opts.compression_level, diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 014875281cefd..2b5fd7795f860 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -986,7 +986,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { newlines_in_values: opts .newlines_in_values .map_or_else(Vec::new, |h| vec![h as u8]), - encoding: opts.encoding.clone().unwrap_or_default(), + encoding: opts.charset.clone().unwrap_or_default(), compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), date_format: opts.date_format.clone().unwrap_or_default(), diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index f1b310357c6d9..1a8273bea36e2 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -674,7 +674,7 @@ pub struct CsvOptions { pub compression_level: ::core::option::Option, /// Optional character encoding #[prost(string, tag = "20")] - pub encoding: ::prost::alloc::string::String, + pub charset: ::prost::alloc::string::String, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index bf0b472a4096c..4f02de697b96e 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -45,7 +45,7 @@ impl CsvOptionsProto { terminator: options.terminator.map_or(vec![], |v| vec![v]), escape: options.escape.map_or(vec![], |v| vec![v]), double_quote: options.double_quote.map_or(vec![], |v| vec![v as u8]), - encoding: options.encoding.clone().unwrap_or_default(), + charset: options.charset.clone().unwrap_or_default(), compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), date_format: options.date_format.clone().unwrap_or_default(), @@ -96,8 +96,8 @@ impl From<&CsvOptionsProto> for CsvOptions { } else { None }, - encoding: if !proto.encoding.is_empty() { - Some(proto.encoding.clone()) + charset: if !proto.charset.is_empty() { + Some(proto.charset.clone()) } else { None }, From b7b8a46dc1871bda93234db6c684851ac89ac130 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Tue, 3 Mar 2026 00:24:54 +1100 Subject: [PATCH 07/11] Factor out `CharsetDecoder`, fix bugs, add feature flag to README --- README.md | 1 + datafusion/datasource-csv/src/charset.rs | 147 +++++++++++++++-------- datafusion/datasource-csv/src/source.rs | 8 +- 3 files changed, 100 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 630d4295bd427..661610205f084 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ Optional features: - `avro`: support for reading the [Apache Avro] format - `backtrace`: include backtrace information in error messages +- `encoding_rs`: support for reading files with character encodings other than UTF-8 - `parquet_encryption`: support for using [Parquet Modular Encryption] - `serde`: enable arrow-schema's `serde` feature diff --git a/datafusion/datasource-csv/src/charset.rs b/datafusion/datasource-csv/src/charset.rs index 2d0c3dbd3638b..d5e17e174ff0a 100644 --- a/datafusion/datasource-csv/src/charset.rs +++ b/datafusion/datasource-csv/src/charset.rs @@ -41,32 +41,29 @@ pub fn lookup_charset(enc: Option<&str>) -> Result> { } } -/// A `Decoder` that decodes input bytes from the specified character encoding -/// to UTF-8 before passing them onto the inner `Decoder`. -pub struct CharsetDecoder { +/// A record batch `Decoder` that decodes input bytes from the specified +/// character encoding to UTF-8 before passing them onto the inner `Decoder`. +#[derive(Debug)] +pub struct CharsetBatchDecoder { inner: T, - charset_decoder: encoding_rs::Decoder, - buffer: Buffer, + decoder: CharsetDecoder, } -impl CharsetDecoder { +impl CharsetBatchDecoder { pub fn new(inner: T, encoding: &'static Encoding) -> Self { - Self { - inner, - charset_decoder: encoding.new_decoder(), - buffer: Buffer::with_capacity(DECODE_BUFFER_CAP), - } + let decoder = CharsetDecoder::new(encoding); + Self { inner, decoder } } } -impl Decoder for CharsetDecoder { +impl Decoder for CharsetBatchDecoder { fn decode(&mut self, buf: &[u8]) -> Result { let last = buf.is_empty(); let mut buf_offset = 0; - if !self.buffer.is_empty() { - let decoded = self.inner.decode(self.buffer.read_buf())?; - self.buffer.consume(decoded); + if !self.decoder.is_empty() { + let decoded = self.inner.decode(self.decoder.read())?; + self.decoder.consume(decoded); if decoded == 0 { return Ok(buf_offset); @@ -74,20 +71,13 @@ impl Decoder for CharsetDecoder { } loop { - self.buffer.backshift(); - - let (res, read, written, _) = self.charset_decoder.decode_to_utf8( - &buf[buf_offset..], - self.buffer.write_buf(), - last, - ); + let (read, input_empty) = self.decoder.fill(&buf[buf_offset..], last); buf_offset += read; - self.buffer.advance(written); - let decoded = self.inner.decode(self.buffer.read_buf())?; - self.buffer.consume(decoded); + let decoded = self.inner.decode(self.decoder.read())?; + self.decoder.consume(decoded); - if res == CoderResult::InputEmpty || decoded == 0 { + if input_empty || decoded == 0 { break; } } @@ -104,28 +94,18 @@ impl Decoder for CharsetDecoder { } } -impl Debug for CharsetDecoder { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CharsetDecoder") - .field("inner", &self.inner) - .field("charset_decoder", self.charset_decoder.encoding()) - .finish() - } -} - +/// A `BufRead` adapter that decodes input bytes from the +/// specified character encoding to UTF-8. +#[derive(Debug)] pub struct CharsetReader { inner: R, - charset_decoder: encoding_rs::Decoder, - buffer: Buffer, + decoder: CharsetDecoder, } impl CharsetReader { pub fn new(inner: R, encoding: &'static Encoding) -> Self { - Self { - inner, - charset_decoder: encoding.new_decoder(), - buffer: Buffer::with_capacity(DECODE_BUFFER_CAP), - } + let decoder = CharsetDecoder::new(encoding); + Self { inner, decoder } } } @@ -140,27 +120,90 @@ impl Read for CharsetReader { impl BufRead for CharsetReader { fn fill_buf(&mut self) -> std::io::Result<&[u8]> { - if self.buffer.is_empty() { - self.buffer.backshift(); - + if self.decoder.needs_input() { let buf = self.inner.fill_buf()?; - let (_, read, written, _) = self.charset_decoder.decode_to_utf8( - buf, - self.buffer.write_buf(), - buf.is_empty(), - ); + let (read, _) = self.decoder.fill(buf, buf.is_empty()); self.inner.consume(read); - self.buffer.advance(written); } - Ok(self.buffer.read_buf()) + Ok(self.decoder.read()) + } + + fn consume(&mut self, amount: usize) { + self.decoder.consume(amount); + } +} + +/// Converts bytes from some character encoding to UTF-8, +/// using an internal fixed-size buffer +pub struct CharsetDecoder { + charset_decoder: encoding_rs::Decoder, + buffer: Buffer, + finished: bool, +} + +impl CharsetDecoder { + /// Creates a new `CharsetDecoder`. + fn new(encoding: &'static Encoding) -> Self { + Self { + charset_decoder: encoding.new_decoder(), + buffer: Buffer::with_capacity(DECODE_BUFFER_CAP), + finished: false, + } + } + + /// Returns `true` if the internal buffer is empty. + fn is_empty(&self) -> bool { + self.buffer.is_empty() + } + + /// Returns `true` if the decoder needs more input to make progress. + fn needs_input(&self) -> bool { + !self.finished && self.buffer.is_empty() + } + + /// Fills the internal buffer by decoding the provided bytes, returning + /// the number of bytes consumed and whether the input was exhausted. + fn fill(&mut self, src: &[u8], last: bool) -> (usize, bool) { + if self.finished { + return (0, true); + } + + self.buffer.backshift(); + + let dst = self.buffer.write_buf(); + let (res, read, written, _) = self.charset_decoder.decode_to_utf8(src, dst, last); + self.buffer.advance(written); + + if last && res == CoderResult::InputEmpty { + self.finished = true; + } + + (read, res == CoderResult::InputEmpty) + } + + /// Returns the unread decoded bytes in the internal buffer. + fn read(&self) -> &[u8] { + self.buffer.read_buf() } + /// Marks the given amount of bytes from the internal buffer as having been read. + /// Subsequent calls to `read` only return bytes that have not been marked as read. fn consume(&mut self, amount: usize) { self.buffer.consume(amount); } } +impl Debug for CharsetDecoder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CharsetDecoder") + .field("charset_decoder", self.charset_decoder.encoding()) + .field("buffer", &self.buffer) + .field("finished", &self.finished) + .finish() + } +} + mod buffer { /// A fixed-sized buffer that maintains both /// a read position and a write position diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index 775d45ee9cf79..558be2687a225 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -430,8 +430,8 @@ impl FileOpener for CsvOpener { let mut reader = match charset { #[cfg(feature = "encoding_rs")] Some(enc) => { - use crate::charset::CharsetDecoder; - let decoder = CharsetDecoder::new(decoder, enc); + use crate::charset::CharsetBatchDecoder; + let decoder = CharsetBatchDecoder::new(decoder, enc); deserialize_reader(reader, decoder) } None => deserialize_reader(reader, decoder), @@ -455,8 +455,8 @@ impl FileOpener for CsvOpener { let stream = match charset { #[cfg(feature = "encoding_rs")] Some(enc) => { - use crate::charset::CharsetDecoder; - let decoder = CharsetDecoder::new(decoder, enc); + use crate::charset::CharsetBatchDecoder; + let decoder = CharsetBatchDecoder::new(decoder, enc); deserialize_stream(stream, DecoderDeserializer::new(decoder)) } None => { From 26b4ce18cc96e25cdbacb9ff0cfe5fe57511a4aa Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Tue, 3 Mar 2026 00:39:02 +1100 Subject: [PATCH 08/11] Fix module name --- datafusion/datasource-csv/src/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-csv/src/mod.rs b/datafusion/datasource-csv/src/mod.rs index 34759e875aaf2..bb2efd9e02ed2 100644 --- a/datafusion/datasource-csv/src/mod.rs +++ b/datafusion/datasource-csv/src/mod.rs @@ -48,10 +48,11 @@ pub fn partitioned_csv_config( } #[cfg(not(feature = "encoding_rs"))] -mod encoding { +mod charset { + use core::convert::Infallible; use datafusion_common::{DataFusionError, Result}; - pub fn find_encoding(enc: Option<&str>) -> Result> { + pub fn lookup_charset(enc: Option<&str>) -> Result> { match enc { Some(_) => Err(DataFusionError::NotImplemented(format!( "The 'encoding_rs' feature must be enabled to decode non-UTF-8 encodings" From 7cc0dbe3a1320f1381fc1a12348ae37474938f71 Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Tue, 3 Mar 2026 00:45:31 +1100 Subject: [PATCH 09/11] Fix more naming inconsistencies --- datafusion/proto-common/src/from_proto/mod.rs | 3 +-- datafusion/proto-common/src/generated/prost.rs | 2 +- datafusion/proto-common/src/to_proto/mod.rs | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 2a756da8d3873..ee9e01c296521 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -984,8 +984,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { escape: proto_opts.escape.first().copied(), double_quote: proto_opts.double_quote.first().map(|h| *h != 0), newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0), - charset: (!proto_opts.encoding.is_empty()) - .then(|| proto_opts.encoding.clone()), + charset: (!proto_opts.charset.is_empty()).then(|| proto_opts.charset.clone()), compression: proto_opts.compression().into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index f1b310357c6d9..1a8273bea36e2 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -674,7 +674,7 @@ pub struct CsvOptions { pub compression_level: ::core::option::Option, /// Optional character encoding #[prost(string, tag = "20")] - pub encoding: ::prost::alloc::string::String, + pub charset: ::prost::alloc::string::String, } /// Options controlling CSV format #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 2b5fd7795f860..3f334d7fbf830 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -986,7 +986,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { newlines_in_values: opts .newlines_in_values .map_or_else(Vec::new, |h| vec![h as u8]), - encoding: opts.charset.clone().unwrap_or_default(), + charset: opts.charset.clone().unwrap_or_default(), compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), date_format: opts.date_format.clone().unwrap_or_default(), From 2eca508b34b6ac31f86bc800b5b29f67288b8f0e Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Tue, 3 Mar 2026 00:54:00 +1100 Subject: [PATCH 10/11] regen.sh --- .../proto-common/src/generated/pbjson.rs | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 11d9f8db4e09f..840b5470eb3f8 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1701,7 +1701,7 @@ impl serde::Serialize for CsvOptions { if self.compression_level.is_some() { len += 1; } - if !self.encoding.is_empty() { + if !self.charset.is_empty() { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; @@ -1784,8 +1784,8 @@ impl serde::Serialize for CsvOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } - if !self.encoding.is_empty() { - struct_ser.serialize_field("encoding", &self.encoding)?; + if !self.charset.is_empty() { + struct_ser.serialize_field("charset", &self.charset)?; } struct_ser.end() } @@ -1829,7 +1829,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "truncatedRows", "compression_level", "compressionLevel", - "encoding", + "charset", ]; #[allow(clippy::enum_variant_names)] @@ -1853,7 +1853,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { Terminator, TruncatedRows, CompressionLevel, - Encoding, + Charset, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1894,7 +1894,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "terminator" => Ok(GeneratedField::Terminator), "truncatedRows" | "truncated_rows" => Ok(GeneratedField::TruncatedRows), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), - "encoding" => Ok(GeneratedField::Encoding), + "charset" => Ok(GeneratedField::Charset), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -1933,7 +1933,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut terminator__ = None; let mut truncated_rows__ = None; let mut compression_level__ = None; - let mut encoding__ = None; + let mut charset__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2072,11 +2072,11 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } - GeneratedField::Encoding => { - if encoding__.is_some() { - return Err(serde::de::Error::duplicate_field("encoding")); + GeneratedField::Charset => { + if charset__.is_some() { + return Err(serde::de::Error::duplicate_field("charset")); } - encoding__ = Some(map_.next_value()?); + charset__ = Some(map_.next_value()?); } } } @@ -2100,7 +2100,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { terminator: terminator__.unwrap_or_default(), truncated_rows: truncated_rows__.unwrap_or_default(), compression_level: compression_level__, - encoding: encoding__.unwrap_or_default(), + charset: charset__.unwrap_or_default(), }) } } From 09a2357798ac496f26ae98c7b8d6ac772fbdc96b Mon Sep 17 00:00:00 2001 From: Alexander Rafferty Date: Tue, 3 Mar 2026 01:19:57 +1100 Subject: [PATCH 11/11] Fix clippy issue --- datafusion/datasource-csv/src/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/datasource-csv/src/mod.rs b/datafusion/datasource-csv/src/mod.rs index bb2efd9e02ed2..72e3c29b00bc3 100644 --- a/datafusion/datasource-csv/src/mod.rs +++ b/datafusion/datasource-csv/src/mod.rs @@ -54,9 +54,10 @@ mod charset { pub fn lookup_charset(enc: Option<&str>) -> Result> { match enc { - Some(_) => Err(DataFusionError::NotImplemented(format!( + Some(_) => Err(DataFusionError::NotImplemented( "The 'encoding_rs' feature must be enabled to decode non-UTF-8 encodings" - )))?, + .to_string(), + ))?, None => Ok(None), } }