From 1fa37104d654c5a4895b2d132dcd497c8eb260ae Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Wed, 27 May 2026 04:04:42 +0000 Subject: [PATCH] [MINOR][VL] Drop modify_arrow_dataset_scan_option.patch This Arrow patch added a native CSV/dataset scan-option API (CsvFragmentScanOptions::from, DeserializeMap, mapToExpressionLiteral, etc.) that was originally consumed by Gluten's native Arrow CSV reader path. That path is gone: - #11190 fell CSV back to vanilla Spark - 0658e906f / 97f463813 / 9ea8290a / d206c5e20 removed the JVM callers, the ArrowUtil reader path, and the spark.gluten.sql.native.arrow.reader.enabled config Nothing inside Gluten or Velox now references the symbols the patch introduces, so it is dead code in the build. This change drops: - ep/build-velox/src/modify_arrow_dataset_scan_option.patch - the `patch -p1 <...>` line in dev/build-arrow.sh - the `cp` / `git add` lines in ep/build-velox/src/get-velox.sh that staged the patch into the Velox source tree Verification (Ubuntu 24.04, x86_64): - grep across the repo: 0 callers of CsvFragmentScanOptions, CsvFragmentScanOptions::from, DeserializeMap, mapToExpressionLiteral, createNative(... FragmentScanOptions), CsvConvertOptions, testCsvConvertOptions - `nm -C` on the freshly built libarrow_dataset.a (java-dist and cpp-jni) shows none of the patch's symbols are present - arrow_ep/cpp/src/arrow/dataset/file_csv.cc on disk is the unmodified upstream source -- patch was never applied - dev/buildbundle-veloxbe.sh --enable_vcpkg=ON completes BUILD SUCCESS for all 5 Spark profiles (3.3 / 3.4 / 3.5 / 4.0 / 4.1) - spark-shell on the resulting bundle, reading a CSV file, prints "GlutenFallbackReporter: Validation failed for plan: Scan csv , due to: Unsupported file format TextReadFormat" and produces a vanilla `FileScan csv` physical plan -- confirming CSV is fallback-by-design and never enters the native path the dropped patch fed Generated-by: Claude claude-opus-4.7 --- dev/build-arrow.sh | 1 - ep/build-velox/src/get-velox.sh | 2 - .../modify_arrow_dataset_scan_option.patch | 883 ------------------ 3 files changed, 886 deletions(-) delete mode 100644 ep/build-velox/src/modify_arrow_dataset_scan_option.patch diff --git a/dev/build-arrow.sh b/dev/build-arrow.sh index 54c6faaf331..1bf4ef9d089 100755 --- a/dev/build-arrow.sh +++ b/dev/build-arrow.sh @@ -31,7 +31,6 @@ function prepare_arrow_build() { #wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep cd arrow_ep patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow.patch - patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow_dataset_scan_option.patch patch -p1 < $CURRENT_DIR/../ep/build-velox/src/cmake-compatibility.patch patch -p1 < $CURRENT_DIR/../ep/build-velox/src/support_ibm_power.patch popd diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 5291d456f06..023356c6a37 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -146,10 +146,8 @@ function apply_compilation_fixes { SUDO_CMD="sudo" fi $SUDO_CMD cp ${CURRENT_DIR}/modify_arrow.patch ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/ - $SUDO_CMD cp ${CURRENT_DIR}/modify_arrow_dataset_scan_option.patch ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/ git add ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/modify_arrow.patch # to avoid the file from being deleted by git clean -dffx :/ - git add ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/modify_arrow_dataset_scan_option.patch # to avoid the file from being deleted by git clean -dffx :/ } function setup_linux { diff --git a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch b/ep/build-velox/src/modify_arrow_dataset_scan_option.patch deleted file mode 100644 index 4af78c030c0..00000000000 --- a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch +++ /dev/null @@ -1,883 +0,0 @@ -diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc -index 09ab77572..f09377cf9 100644 ---- a/cpp/src/arrow/dataset/file_csv.cc -+++ b/cpp/src/arrow/dataset/file_csv.cc -@@ -24,6 +24,7 @@ - #include - #include - -+#include "arrow/c/bridge.h" - #include "arrow/csv/options.h" - #include "arrow/csv/parser.h" - #include "arrow/csv/reader.h" -@@ -52,6 +53,9 @@ using internal::Executor; - using internal::SerialExecutor; - - namespace dataset { -+namespace { -+inline bool parseBool(const std::string& value) { return value == "true" ? true : false; } -+} // namespace - - struct CsvInspectedFragment : public InspectedFragment { - CsvInspectedFragment(std::vector column_names, -@@ -503,5 +507,33 @@ Future<> CsvFileWriter::FinishInternal() { - return Status::OK(); - } - -+Result> CsvFragmentScanOptions::from( -+ const std::unordered_map& configs) { -+ std::shared_ptr options = -+ std::make_shared(); -+ for (auto const& it : configs) { -+ auto& key = it.first; -+ auto& value = it.second; -+ if (key == "delimiter") { -+ options->parse_options.delimiter = value.data()[0]; -+ } else if (key == "quoting") { -+ options->parse_options.quoting = parseBool(value); -+ } else if (key == "column_types") { -+ int64_t schema_address = std::stol(value); -+ ArrowSchema* cSchema = reinterpret_cast(schema_address); -+ ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(cSchema)); -+ auto& column_types = options->convert_options.column_types; -+ for (auto field : schema->fields()) { -+ column_types[field->name()] = field->type(); -+ } -+ } else if (key == "strings_can_be_null") { -+ options->convert_options.strings_can_be_null = parseBool(value); -+ } else { -+ return Status::Invalid("Config " + it.first + "is not supported."); -+ } -+ } -+ return options; -+} -+ - } // namespace dataset - } // namespace arrow -diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h -index 42e3fd724..4d2825183 100644 ---- a/cpp/src/arrow/dataset/file_csv.h -+++ b/cpp/src/arrow/dataset/file_csv.h -@@ -85,6 +85,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { - struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - std::string type_name() const override { return kCsvTypeName; } - -+ static Result> from( -+ const std::unordered_map& configs); -+ - using StreamWrapFunc = std::function>( - std::shared_ptr)>; - -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc b/cpp/src/arrow/engine/substrait/expression_internal.cc -index 5d892af9a..0f8b0448b 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.cc -+++ b/cpp/src/arrow/engine/substrait/expression_internal.cc -@@ -1337,5 +1337,17 @@ Result> ToProto( - return std::move(out); - } - -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out) { -+ ARROW_RETURN_IF(!literal.has_map(), Status::Invalid("Literal does not have a map.")); -+ auto literalMap = literal.map(); -+ auto size = literalMap.key_values_size(); -+ for (auto i = 0; i < size; i++) { -+ substrait::Expression_Literal_Map_KeyValue keyValue = literalMap.key_values(i); -+ out.emplace(keyValue.key().string(), keyValue.value().string()); -+ } -+ return Status::OK(); -+} -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.h b/cpp/src/arrow/engine/substrait/expression_internal.h -index 2ce2ee76a..9be81b7ab 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.h -+++ b/cpp/src/arrow/engine/substrait/expression_internal.h -@@ -61,5 +61,9 @@ ARROW_ENGINE_EXPORT - Result FromProto(const substrait::AggregateFunction&, bool is_hash, - const ExtensionSet&, const ConversionOptions&); - -+ARROW_ENGINE_EXPORT -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out); -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc -index 9e670f121..02e5c7171 100644 ---- a/cpp/src/arrow/engine/substrait/serde.cc -+++ b/cpp/src/arrow/engine/substrait/serde.cc -@@ -247,6 +247,16 @@ Result DeserializeExpressions( - return FromProto(extended_expression, ext_set_out, conversion_options, registry); - } - -+Status DeserializeMap(const Buffer& buf, -+ std::unordered_map& out) { -+ // ARROW_ASSIGN_OR_RAISE(auto advanced_extension, -+ // ParseFromBuffer(buf)); -+ // return FromProto(advanced_extension, out); -+ ARROW_ASSIGN_OR_RAISE(auto literal, -+ ParseFromBuffer(buf)); -+ return FromProto(literal, out); -+} -+ - namespace { - - Result> MakeSingleDeclarationPlan( -diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h -index ab749f4a6..6312ec239 100644 ---- a/cpp/src/arrow/engine/substrait/serde.h -+++ b/cpp/src/arrow/engine/substrait/serde.h -@@ -23,6 +23,7 @@ - #include - #include - #include -+#include - #include - - #include "arrow/compute/type_fwd.h" -@@ -183,6 +184,9 @@ ARROW_ENGINE_EXPORT Result DeserializeExpressions( - const ConversionOptions& conversion_options = {}, - ExtensionSet* ext_set_out = NULLPTR); - -+ARROW_ENGINE_EXPORT Status -+DeserializeMap(const Buffer& buf, std::unordered_map& out); -+ - /// \brief Deserializes a Substrait Type message to the corresponding Arrow type - /// - /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type -diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml -index d4d3e2c0f..ce72eaa1f 100644 ---- a/java/dataset/pom.xml -+++ b/java/dataset/pom.xml -@@ -25,9 +25,10 @@ - jar - - ../../../cpp/release-build/ -- 2.5.0 - 1.11.0 - 1.11.3 -+ 0.31.0 -+ 3.25.3 - - - -@@ -47,6 +48,18 @@ - arrow-c-data - compile - -+ -+ io.substrait -+ core -+ ${substrait.version} -+ provided -+ -+ -+ com.google.protobuf -+ protobuf-java -+ ${protobuf.version} -+ provided -+ - - org.apache.arrow - arrow-memory-netty -diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc -index 8d7dafd84..89cdc39fe 100644 ---- a/java/dataset/src/main/cpp/jni_wrapper.cc -+++ b/java/dataset/src/main/cpp/jni_wrapper.cc -@@ -25,6 +25,7 @@ - #include "arrow/c/helpers.h" - #include "arrow/dataset/api.h" - #include "arrow/dataset/file_base.h" -+#include "arrow/dataset/file_csv.h" - #include "arrow/filesystem/localfs.h" - #include "arrow/filesystem/path_util.h" - #ifdef ARROW_S3 -@@ -122,6 +123,19 @@ arrow::Result> GetFileFormat( - } - } - -+arrow::Result> -+GetFragmentScanOptions(jint file_format_id, -+ const std::unordered_map& configs) { -+ switch (file_format_id) { -+#ifdef ARROW_CSV -+ case 3: -+ return arrow::dataset::CsvFragmentScanOptions::from(configs); -+#endif -+ default: -+ return arrow::Status::Invalid("Illegal file format id: " ,file_format_id); -+ } -+} -+ - class ReserveFromJava : public arrow::dataset::jni::ReservationListener { - public: - ReserveFromJava(JavaVM* vm, jobject java_reservation_listener) -@@ -460,12 +474,13 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset - /* - * Class: org_apache_arrow_dataset_jni_JniWrapper - * Method: createScanner -- * Signature: (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J -+ * Signature: -+ * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ;Ljava/nio/ByteBuffer;J)J - */ - JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( - JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, -- jobject substrait_projection, jobject substrait_filter, -- jlong batch_size, jlong memory_pool_id) { -+ jobject substrait_projection, jobject substrait_filter, jlong batch_size, -+ jlong file_format_id, jobject options, jlong memory_pool_id) { - JNI_METHOD_START - arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); - if (pool == nullptr) { -@@ -514,6 +529,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann - } - JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr)); - } -+ if (file_format_id != -1 && options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options)); -+ } - JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); - - auto scanner = JniGetOrThrow(scanner_builder->Finish()); -@@ -627,14 +650,31 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina - /* - * Class: org_apache_arrow_dataset_file_JniWrapper - * Method: makeFileSystemDatasetFactory -- * Signature: (Ljava/lang/String;II)J -+ * Signature: (Ljava/lang/String;IILjava/lang/String;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( -- JNIEnv* env, jobject, jstring uri, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( -+ JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject options) { - JNI_METHOD_START - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - std::shared_ptr d = - JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( -@@ -645,16 +685,33 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav - - /* - * Class: org_apache_arrow_dataset_file_JniWrapper -- * Method: makeFileSystemDatasetFactory -- * Signature: ([Ljava/lang/String;II)J -+ * Method: makeFileSystemDatasetFactoryWithFiles -+ * Signature: ([Ljava/lang/String;IIJ;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( -- JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles( -+ JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobject options) { - JNI_METHOD_START - - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - - std::vector uri_vec = ToStringVector(env, uris); -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -index aa3156905..a0b6fb168 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -@@ -17,8 +17,11 @@ - - package org.apache.arrow.dataset.file; - -+import java.util.Optional; -+ - import org.apache.arrow.dataset.jni.NativeDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.memory.BufferAllocator; - - /** -@@ -27,21 +30,34 @@ import org.apache.arrow.memory.BufferAllocator; - public class FileSystemDatasetFactory extends NativeDatasetFactory { - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -- String uri) { -- super(allocator, memoryPool, createNative(format, uri)); -+ String uri, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions)); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String uri) { -+ super(allocator, memoryPool, createNative(format, uri, Optional.empty())); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String[] uris, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions)); - } - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, - String[] uris) { -- super(allocator, memoryPool, createNative(format, uris)); -+ super(allocator, memoryPool, createNative(format, uris, Optional.empty())); - } - -- private static long createNative(FileFormat format, String uri) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); -+ private static long createNative(FileFormat format, String uri, Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - -- private static long createNative(FileFormat format, String[] uris) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); -+ private static long createNative(FileFormat format, String[] uris, -+ Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactoryWithFiles(uris, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -index c3a1a4e58..c3f8e12b3 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -@@ -17,6 +17,8 @@ - - package org.apache.arrow.dataset.file; - -+import java.nio.ByteBuffer; -+ - import org.apache.arrow.dataset.jni.JniLoader; - - /** -@@ -43,7 +45,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String uri, int fileFormat); -+ public native long makeFileSystemDatasetFactory(String uri, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a -@@ -54,7 +57,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); -+ public native long makeFileSystemDatasetFactoryWithFiles(String[] uris, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -index 637a3e8f2..6d6309140 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -@@ -80,7 +80,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::Scanner instance. - */ - public native long createScanner(long datasetId, String[] columns, ByteBuffer substraitProjection, -- ByteBuffer substraitFilter, long batchSize, long memoryPool); -+ ByteBuffer substraitFilter, long batchSize, long fileFormat, -+ ByteBuffer serializedFragmentScanOptions, long memoryPool); - - /** - * Get a serialized schema from native instance of a Scanner. -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -index d9abad997..3a96fe768 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -@@ -17,6 +17,9 @@ - - package org.apache.arrow.dataset.jni; - -+import java.nio.ByteBuffer; -+ -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.source.Dataset; - -@@ -40,11 +43,18 @@ public class NativeDataset implements Dataset { - if (closed) { - throw new NativeInstanceReleasedException(); - } -- -+ int fileFormat = -1; -+ ByteBuffer serialized = null; -+ if (options.getFragmentScanOptions().isPresent()) { -+ FragmentScanOptions fragmentScanOptions = options.getFragmentScanOptions().get(); -+ fileFormat = fragmentScanOptions.fileFormatId(); -+ serialized = fragmentScanOptions.serialize(); -+ } - long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), - options.getSubstraitProjection().orElse(null), - options.getSubstraitFilter().orElse(null), -- options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); -+ options.getBatchSize(), fileFormat, serialized, -+ context.getMemoryPool().getNativeInstanceId()); - - return new NativeScanner(context, scannerId); - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -new file mode 100644 -index 000000000..8acb2b2d4 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -@@ -0,0 +1,50 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner; -+ -+import java.nio.ByteBuffer; -+import java.util.Map; -+ -+import org.apache.arrow.dataset.substrait.util.ConvertUtil; -+ -+import io.substrait.proto.Expression; -+ -+public interface FragmentScanOptions { -+ String typeName(); -+ -+ int fileFormatId(); -+ -+ ByteBuffer serialize(); -+ -+ /** -+ * serialize the map. -+ * -+ * @param config config map -+ * @return bufer to jni call argument, should be DirectByteBuffer -+ */ -+ default ByteBuffer serializeMap(Map config) { -+ if (config.isEmpty()) { -+ return null; -+ } -+ -+ Expression.Literal literal = ConvertUtil.mapToExpressionLiteral(config); -+ ByteBuffer buf = ByteBuffer.allocateDirect(literal.getSerializedSize()); -+ buf.put(literal.toByteArray()); -+ return buf; -+ } -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -index 995d05ac3..aad71930c 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -@@ -31,6 +31,8 @@ public class ScanOptions { - private final Optional substraitProjection; - private final Optional substraitFilter; - -+ private final Optional fragmentScanOptions; -+ - /** - * Constructor. - * @param columns Projected columns. Empty for scanning all columns. -@@ -61,6 +63,7 @@ public class ScanOptions { - this.columns = columns; - this.substraitProjection = Optional.empty(); - this.substraitFilter = Optional.empty(); -+ this.fragmentScanOptions = Optional.empty(); - } - - public ScanOptions(long batchSize) { -@@ -83,6 +86,10 @@ public class ScanOptions { - return substraitFilter; - } - -+ public Optional getFragmentScanOptions() { -+ return fragmentScanOptions; -+ } -+ - /** - * Builder for Options used during scanning. - */ -@@ -91,6 +98,7 @@ public class ScanOptions { - private Optional columns; - private ByteBuffer substraitProjection; - private ByteBuffer substraitFilter; -+ private FragmentScanOptions fragmentScanOptions; - - /** - * Constructor. -@@ -136,6 +144,18 @@ public class ScanOptions { - return this; - } - -+ /** -+ * Set the FragmentScanOptions. -+ * -+ * @param fragmentScanOptions scan options -+ * @return the ScanOptions configured. -+ */ -+ public Builder fragmentScanOptions(FragmentScanOptions fragmentScanOptions) { -+ Preconditions.checkNotNull(fragmentScanOptions); -+ this.fragmentScanOptions = fragmentScanOptions; -+ return this; -+ } -+ - public ScanOptions build() { - return new ScanOptions(this); - } -@@ -146,5 +166,6 @@ public class ScanOptions { - columns = builder.columns; - substraitProjection = Optional.ofNullable(builder.substraitProjection); - substraitFilter = Optional.ofNullable(builder.substraitFilter); -+ fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions); - } - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -new file mode 100644 -index 000000000..08e35ede2 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -@@ -0,0 +1,51 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.util.Map; -+import java.util.Optional; -+ -+import org.apache.arrow.c.ArrowSchema; -+ -+public class CsvConvertOptions { -+ -+ private final Map configs; -+ -+ private Optional cSchema = Optional.empty(); -+ -+ public CsvConvertOptions(Map configs) { -+ this.configs = configs; -+ } -+ -+ public Optional getArrowSchema() { -+ return cSchema; -+ } -+ -+ public Map getConfigs() { -+ return configs; -+ } -+ -+ public void set(String key, String value) { -+ configs.put(key, value); -+ } -+ -+ public void setArrowSchema(ArrowSchema cSchema) { -+ this.cSchema = Optional.of(cSchema); -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -new file mode 100644 -index 000000000..88973f0a0 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -@@ -0,0 +1,97 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.io.Serializable; -+import java.nio.ByteBuffer; -+import java.util.Locale; -+import java.util.Map; -+import java.util.stream.Collectors; -+import java.util.stream.Stream; -+ -+import org.apache.arrow.dataset.file.FileFormat; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; -+ -+public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions { -+ private final CsvConvertOptions convertOptions; -+ private final Map readOptions; -+ private final Map parseOptions; -+ -+ -+ /** -+ * csv scan options, map to CPP struct CsvFragmentScanOptions. -+ * -+ * @param convertOptions same struct in CPP -+ * @param readOptions same struct in CPP -+ * @param parseOptions same struct in CPP -+ */ -+ public CsvFragmentScanOptions(CsvConvertOptions convertOptions, -+ Map readOptions, -+ Map parseOptions) { -+ this.convertOptions = convertOptions; -+ this.readOptions = readOptions; -+ this.parseOptions = parseOptions; -+ } -+ -+ public String typeName() { -+ return FileFormat.CSV.name().toLowerCase(Locale.ROOT); -+ } -+ -+ /** -+ * File format id. -+ * -+ * @return id -+ */ -+ public int fileFormatId() { -+ return FileFormat.CSV.id(); -+ } -+ -+ /** -+ * Serialize this class to ByteBuffer and then called by jni call. -+ * -+ * @return DirectByteBuffer -+ */ -+ public ByteBuffer serialize() { -+ Map options = Stream.concat(Stream.concat(readOptions.entrySet().stream(), -+ parseOptions.entrySet().stream()), -+ convertOptions.getConfigs().entrySet().stream()).collect( -+ Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -+ -+ if (convertOptions.getArrowSchema().isPresent()) { -+ options.put("column_types", Long.toString(convertOptions.getArrowSchema().get().memoryAddress())); -+ } -+ return serializeMap(options); -+ } -+ -+ public static CsvFragmentScanOptions deserialize(String serialized) { -+ throw new UnsupportedOperationException("Not implemented now"); -+ } -+ -+ public CsvConvertOptions getConvertOptions() { -+ return convertOptions; -+ } -+ -+ public Map getReadOptions() { -+ return readOptions; -+ } -+ -+ public Map getParseOptions() { -+ return parseOptions; -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -new file mode 100644 -index 000000000..31a4023af ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -@@ -0,0 +1,46 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.substrait.util; -+ -+import java.util.Map; -+ -+import io.substrait.proto.Expression; -+ -+public class ConvertUtil { -+ -+ /** -+ * Convert map to substrait Expression. -+ * -+ * @return Substrait Expression -+ */ -+ public static Expression.Literal mapToExpressionLiteral(Map values) { -+ Expression.Literal.Builder literalBuilder = Expression.Literal.newBuilder(); -+ Expression.Literal.Map.KeyValue.Builder keyValueBuilder = -+ Expression.Literal.Map.KeyValue.newBuilder(); -+ Expression.Literal.Map.Builder mapBuilder = Expression.Literal.Map.newBuilder(); -+ for (Map.Entry entry : values.entrySet()) { -+ literalBuilder.setString(entry.getKey()); -+ keyValueBuilder.setKey(literalBuilder.build()); -+ literalBuilder.setString(entry.getValue()); -+ keyValueBuilder.setValue(literalBuilder.build()); -+ mapBuilder.addKeyValues(keyValueBuilder.build()); -+ } -+ literalBuilder.setMap(mapBuilder.build()); -+ return literalBuilder.build(); -+ } -+} -diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -index 0fba72892..e7903b7a4 100644 ---- a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -@@ -31,6 +31,9 @@ import java.util.HashMap; - import java.util.Map; - import java.util.Optional; - -+import org.apache.arrow.c.ArrowSchema; -+import org.apache.arrow.c.CDataDictionaryProvider; -+import org.apache.arrow.c.Data; - import org.apache.arrow.dataset.ParquetWriteSupport; - import org.apache.arrow.dataset.TestDataset; - import org.apache.arrow.dataset.file.FileFormat; -@@ -38,8 +41,11 @@ import org.apache.arrow.dataset.file.FileSystemDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.scanner.Scanner; -+import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions; -+import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions; - import org.apache.arrow.dataset.source.Dataset; - import org.apache.arrow.dataset.source.DatasetFactory; -+import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.types.pojo.ArrowType; - import org.apache.arrow.vector.types.pojo.Field; -@@ -49,6 +55,8 @@ import org.junit.ClassRule; - import org.junit.Test; - import org.junit.rules.TemporaryFolder; - -+import com.google.common.collect.ImmutableMap; -+ - public class TestAceroSubstraitConsumer extends TestDataset { - - @ClassRule -@@ -457,4 +465,42 @@ public class TestAceroSubstraitConsumer extends TestDataset { - substraitExpression.put(decodedSubstrait); - return substraitExpression; - } -+ -+ @Test -+ public void testCsvConvertOptions() throws Exception { -+ final Schema schema = new Schema(Arrays.asList( -+ Field.nullable("Id", new ArrowType.Int(32, true)), -+ Field.nullable("Name", new ArrowType.Utf8()), -+ Field.nullable("Language", new ArrowType.Utf8()) -+ ), null); -+ String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; -+ BufferAllocator allocator = rootAllocator(); -+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); -+ CDataDictionaryProvider provider = new CDataDictionaryProvider()) { -+ Data.exportSchema(allocator, schema, provider, cSchema); -+ CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of("delimiter", ";")); -+ convertOptions.setArrowSchema(cSchema); -+ CsvFragmentScanOptions fragmentScanOptions = new CsvFragmentScanOptions( -+ convertOptions, ImmutableMap.of(), ImmutableMap.of()); -+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768) -+ .columns(Optional.empty()) -+ .fragmentScanOptions(fragmentScanOptions) -+ .build(); -+ try ( -+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), -+ FileFormat.CSV, path); -+ Dataset dataset = datasetFactory.finish(); -+ Scanner scanner = dataset.newScan(options); -+ ArrowReader reader = scanner.scanBatches() -+ ) { -+ assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); -+ int rowCount = 0; -+ while (reader.loadNextBatch()) { -+ assertEquals("[1, 2, 3]", reader.getVectorSchemaRoot().getVector("Id").toString()); -+ rowCount += reader.getVectorSchemaRoot().getRowCount(); -+ } -+ assertEquals(3, rowCount); -+ } -+ } -+ } - } -diff --git a/java/dataset/src/test/resources/data/student.csv b/java/dataset/src/test/resources/data/student.csv -new file mode 100644 -index 000000000..329194609 ---- /dev/null -+++ b/java/dataset/src/test/resources/data/student.csv -@@ -0,0 +1,4 @@ -+Id;Name;Language -+1;Juno;Java -+2;Peter;Python -+3;Celin;C++