diff --git a/dev/build-arrow.sh b/dev/build-arrow.sh index 54c6faaf331..1bf4ef9d089 100755 --- a/dev/build-arrow.sh +++ b/dev/build-arrow.sh @@ -31,7 +31,6 @@ function prepare_arrow_build() { #wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep cd arrow_ep patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow.patch - patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow_dataset_scan_option.patch patch -p1 < $CURRENT_DIR/../ep/build-velox/src/cmake-compatibility.patch patch -p1 < $CURRENT_DIR/../ep/build-velox/src/support_ibm_power.patch popd diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 5291d456f06..023356c6a37 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -146,10 +146,8 @@ function apply_compilation_fixes { SUDO_CMD="sudo" fi $SUDO_CMD cp ${CURRENT_DIR}/modify_arrow.patch ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/ - $SUDO_CMD cp ${CURRENT_DIR}/modify_arrow_dataset_scan_option.patch ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/ git add ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/modify_arrow.patch # to avoid the file from being deleted by git clean -dffx :/ - git add ${VELOX_HOME}/CMake/resolve_dependency_modules/arrow/modify_arrow_dataset_scan_option.patch # to avoid the file from being deleted by git clean -dffx :/ } function setup_linux { diff --git a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch b/ep/build-velox/src/modify_arrow_dataset_scan_option.patch deleted file mode 100644 index 4af78c030c0..00000000000 --- a/ep/build-velox/src/modify_arrow_dataset_scan_option.patch +++ /dev/null @@ -1,883 +0,0 @@ -diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc -index 09ab77572..f09377cf9 100644 ---- a/cpp/src/arrow/dataset/file_csv.cc -+++ b/cpp/src/arrow/dataset/file_csv.cc -@@ -24,6 +24,7 @@ - #include - #include - -+#include "arrow/c/bridge.h" - #include "arrow/csv/options.h" - #include "arrow/csv/parser.h" - #include "arrow/csv/reader.h" -@@ -52,6 +53,9 @@ using internal::Executor; - using internal::SerialExecutor; - - namespace dataset { -+namespace { -+inline bool parseBool(const std::string& value) { return value == "true" ? true : false; } -+} // namespace - - struct CsvInspectedFragment : public InspectedFragment { - CsvInspectedFragment(std::vector column_names, -@@ -503,5 +507,33 @@ Future<> CsvFileWriter::FinishInternal() { - return Status::OK(); - } - -+Result> CsvFragmentScanOptions::from( -+ const std::unordered_map& configs) { -+ std::shared_ptr options = -+ std::make_shared(); -+ for (auto const& it : configs) { -+ auto& key = it.first; -+ auto& value = it.second; -+ if (key == "delimiter") { -+ options->parse_options.delimiter = value.data()[0]; -+ } else if (key == "quoting") { -+ options->parse_options.quoting = parseBool(value); -+ } else if (key == "column_types") { -+ int64_t schema_address = std::stol(value); -+ ArrowSchema* cSchema = reinterpret_cast(schema_address); -+ ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(cSchema)); -+ auto& column_types = options->convert_options.column_types; -+ for (auto field : schema->fields()) { -+ column_types[field->name()] = field->type(); -+ } -+ } else if (key == "strings_can_be_null") { -+ options->convert_options.strings_can_be_null = parseBool(value); -+ } else { -+ return Status::Invalid("Config " + it.first + "is not supported."); -+ } -+ } -+ return options; -+} -+ - } // namespace dataset - } // namespace arrow -diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h -index 42e3fd724..4d2825183 100644 ---- a/cpp/src/arrow/dataset/file_csv.h -+++ b/cpp/src/arrow/dataset/file_csv.h -@@ -85,6 +85,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { - struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { - std::string type_name() const override { return kCsvTypeName; } - -+ static Result> from( -+ const std::unordered_map& configs); -+ - using StreamWrapFunc = std::function>( - std::shared_ptr)>; - -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.cc b/cpp/src/arrow/engine/substrait/expression_internal.cc -index 5d892af9a..0f8b0448b 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.cc -+++ b/cpp/src/arrow/engine/substrait/expression_internal.cc -@@ -1337,5 +1337,17 @@ Result> ToProto( - return std::move(out); - } - -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out) { -+ ARROW_RETURN_IF(!literal.has_map(), Status::Invalid("Literal does not have a map.")); -+ auto literalMap = literal.map(); -+ auto size = literalMap.key_values_size(); -+ for (auto i = 0; i < size; i++) { -+ substrait::Expression_Literal_Map_KeyValue keyValue = literalMap.key_values(i); -+ out.emplace(keyValue.key().string(), keyValue.value().string()); -+ } -+ return Status::OK(); -+} -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/expression_internal.h b/cpp/src/arrow/engine/substrait/expression_internal.h -index 2ce2ee76a..9be81b7ab 100644 ---- a/cpp/src/arrow/engine/substrait/expression_internal.h -+++ b/cpp/src/arrow/engine/substrait/expression_internal.h -@@ -61,5 +61,9 @@ ARROW_ENGINE_EXPORT - Result FromProto(const substrait::AggregateFunction&, bool is_hash, - const ExtensionSet&, const ConversionOptions&); - -+ARROW_ENGINE_EXPORT -+Status FromProto(const substrait::Expression::Literal& literal, -+ std::unordered_map& out); -+ - } // namespace engine - } // namespace arrow -diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc -index 9e670f121..02e5c7171 100644 ---- a/cpp/src/arrow/engine/substrait/serde.cc -+++ b/cpp/src/arrow/engine/substrait/serde.cc -@@ -247,6 +247,16 @@ Result DeserializeExpressions( - return FromProto(extended_expression, ext_set_out, conversion_options, registry); - } - -+Status DeserializeMap(const Buffer& buf, -+ std::unordered_map& out) { -+ // ARROW_ASSIGN_OR_RAISE(auto advanced_extension, -+ // ParseFromBuffer(buf)); -+ // return FromProto(advanced_extension, out); -+ ARROW_ASSIGN_OR_RAISE(auto literal, -+ ParseFromBuffer(buf)); -+ return FromProto(literal, out); -+} -+ - namespace { - - Result> MakeSingleDeclarationPlan( -diff --git a/cpp/src/arrow/engine/substrait/serde.h b/cpp/src/arrow/engine/substrait/serde.h -index ab749f4a6..6312ec239 100644 ---- a/cpp/src/arrow/engine/substrait/serde.h -+++ b/cpp/src/arrow/engine/substrait/serde.h -@@ -23,6 +23,7 @@ - #include - #include - #include -+#include - #include - - #include "arrow/compute/type_fwd.h" -@@ -183,6 +184,9 @@ ARROW_ENGINE_EXPORT Result DeserializeExpressions( - const ConversionOptions& conversion_options = {}, - ExtensionSet* ext_set_out = NULLPTR); - -+ARROW_ENGINE_EXPORT Status -+DeserializeMap(const Buffer& buf, std::unordered_map& out); -+ - /// \brief Deserializes a Substrait Type message to the corresponding Arrow type - /// - /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Type -diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml -index d4d3e2c0f..ce72eaa1f 100644 ---- a/java/dataset/pom.xml -+++ b/java/dataset/pom.xml -@@ -25,9 +25,10 @@ - jar - - ../../../cpp/release-build/ -- 2.5.0 - 1.11.0 - 1.11.3 -+ 0.31.0 -+ 3.25.3 - - - -@@ -47,6 +48,18 @@ - arrow-c-data - compile - -+ -+ io.substrait -+ core -+ ${substrait.version} -+ provided -+ -+ -+ com.google.protobuf -+ protobuf-java -+ ${protobuf.version} -+ provided -+ - - org.apache.arrow - arrow-memory-netty -diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc -index 8d7dafd84..89cdc39fe 100644 ---- a/java/dataset/src/main/cpp/jni_wrapper.cc -+++ b/java/dataset/src/main/cpp/jni_wrapper.cc -@@ -25,6 +25,7 @@ - #include "arrow/c/helpers.h" - #include "arrow/dataset/api.h" - #include "arrow/dataset/file_base.h" -+#include "arrow/dataset/file_csv.h" - #include "arrow/filesystem/localfs.h" - #include "arrow/filesystem/path_util.h" - #ifdef ARROW_S3 -@@ -122,6 +123,19 @@ arrow::Result> GetFileFormat( - } - } - -+arrow::Result> -+GetFragmentScanOptions(jint file_format_id, -+ const std::unordered_map& configs) { -+ switch (file_format_id) { -+#ifdef ARROW_CSV -+ case 3: -+ return arrow::dataset::CsvFragmentScanOptions::from(configs); -+#endif -+ default: -+ return arrow::Status::Invalid("Illegal file format id: " ,file_format_id); -+ } -+} -+ - class ReserveFromJava : public arrow::dataset::jni::ReservationListener { - public: - ReserveFromJava(JavaVM* vm, jobject java_reservation_listener) -@@ -460,12 +474,13 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset - /* - * Class: org_apache_arrow_dataset_jni_JniWrapper - * Method: createScanner -- * Signature: (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J -+ * Signature: -+ * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ;Ljava/nio/ByteBuffer;J)J - */ - JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( - JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, -- jobject substrait_projection, jobject substrait_filter, -- jlong batch_size, jlong memory_pool_id) { -+ jobject substrait_projection, jobject substrait_filter, jlong batch_size, -+ jlong file_format_id, jobject options, jlong memory_pool_id) { - JNI_METHOD_START - arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); - if (pool == nullptr) { -@@ -514,6 +529,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann - } - JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr)); - } -+ if (file_format_id != -1 && options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options)); -+ } - JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); - - auto scanner = JniGetOrThrow(scanner_builder->Finish()); -@@ -627,14 +650,31 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina - /* - * Class: org_apache_arrow_dataset_file_JniWrapper - * Method: makeFileSystemDatasetFactory -- * Signature: (Ljava/lang/String;II)J -+ * Signature: (Ljava/lang/String;IILjava/lang/String;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( -- JNIEnv* env, jobject, jstring uri, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( -+ JNIEnv* env, jobject, jstring uri, jint file_format_id, jobject options) { - JNI_METHOD_START - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - std::shared_ptr d = - JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( -@@ -645,16 +685,33 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav - - /* - * Class: org_apache_arrow_dataset_file_JniWrapper -- * Method: makeFileSystemDatasetFactory -- * Signature: ([Ljava/lang/String;II)J -+ * Method: makeFileSystemDatasetFactoryWithFiles -+ * Signature: ([Ljava/lang/String;IIJ;Ljava/nio/ByteBuffer)J - */ - JNIEXPORT jlong JNICALL --Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( -- JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { -+Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles( -+ JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobject options) { - JNI_METHOD_START - - std::shared_ptr file_format = - JniGetOrThrow(GetFileFormat(file_format_id)); -+ if (options != nullptr) { -+ std::unordered_map option_map; -+ std::shared_ptr buffer = LoadArrowBufferFromByteBuffer(env, options); -+ JniAssertOkOrThrow(arrow::engine::DeserializeMap(*buffer, option_map)); -+ std::shared_ptr scan_options = -+ JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); -+ file_format->default_fragment_scan_options = scan_options; -+#ifdef ARROW_CSV -+ if (file_format_id == 3) { -+ std::shared_ptr csv_file_format = -+ std::dynamic_pointer_cast(file_format); -+ csv_file_format->parse_options = -+ std::dynamic_pointer_cast(scan_options) -+ ->parse_options; -+ } -+#endif -+ } - arrow::dataset::FileSystemFactoryOptions options; - - std::vector uri_vec = ToStringVector(env, uris); -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -index aa3156905..a0b6fb168 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java -@@ -17,8 +17,11 @@ - - package org.apache.arrow.dataset.file; - -+import java.util.Optional; -+ - import org.apache.arrow.dataset.jni.NativeDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.memory.BufferAllocator; - - /** -@@ -27,21 +30,34 @@ import org.apache.arrow.memory.BufferAllocator; - public class FileSystemDatasetFactory extends NativeDatasetFactory { - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -- String uri) { -- super(allocator, memoryPool, createNative(format, uri)); -+ String uri, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions)); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String uri) { -+ super(allocator, memoryPool, createNative(format, uri, Optional.empty())); -+ } -+ -+ public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, -+ String[] uris, Optional fragmentScanOptions) { -+ super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions)); - } - - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, - String[] uris) { -- super(allocator, memoryPool, createNative(format, uris)); -+ super(allocator, memoryPool, createNative(format, uris, Optional.empty())); - } - -- private static long createNative(FileFormat format, String uri) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); -+ private static long createNative(FileFormat format, String uri, Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - -- private static long createNative(FileFormat format, String[] uris) { -- return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); -+ private static long createNative(FileFormat format, String[] uris, -+ Optional fragmentScanOptions) { -+ return JniWrapper.get().makeFileSystemDatasetFactoryWithFiles(uris, format.id(), -+ fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); - } - - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -index c3a1a4e58..c3f8e12b3 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java -@@ -17,6 +17,8 @@ - - package org.apache.arrow.dataset.file; - -+import java.nio.ByteBuffer; -+ - import org.apache.arrow.dataset.jni.JniLoader; - - /** -@@ -43,7 +45,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String uri, int fileFormat); -+ public native long makeFileSystemDatasetFactory(String uri, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a -@@ -54,7 +57,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. - * @see FileFormat - */ -- public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); -+ public native long makeFileSystemDatasetFactoryWithFiles(String[] uris, int fileFormat, -+ ByteBuffer serializedFragmentScanOptions); - - /** - * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -index 637a3e8f2..6d6309140 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java -@@ -80,7 +80,8 @@ public class JniWrapper { - * @return the native pointer of the arrow::dataset::Scanner instance. - */ - public native long createScanner(long datasetId, String[] columns, ByteBuffer substraitProjection, -- ByteBuffer substraitFilter, long batchSize, long memoryPool); -+ ByteBuffer substraitFilter, long batchSize, long fileFormat, -+ ByteBuffer serializedFragmentScanOptions, long memoryPool); - - /** - * Get a serialized schema from native instance of a Scanner. -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -index d9abad997..3a96fe768 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java -@@ -17,6 +17,9 @@ - - package org.apache.arrow.dataset.jni; - -+import java.nio.ByteBuffer; -+ -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.source.Dataset; - -@@ -40,11 +43,18 @@ public class NativeDataset implements Dataset { - if (closed) { - throw new NativeInstanceReleasedException(); - } -- -+ int fileFormat = -1; -+ ByteBuffer serialized = null; -+ if (options.getFragmentScanOptions().isPresent()) { -+ FragmentScanOptions fragmentScanOptions = options.getFragmentScanOptions().get(); -+ fileFormat = fragmentScanOptions.fileFormatId(); -+ serialized = fragmentScanOptions.serialize(); -+ } - long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns().orElse(null), - options.getSubstraitProjection().orElse(null), - options.getSubstraitFilter().orElse(null), -- options.getBatchSize(), context.getMemoryPool().getNativeInstanceId()); -+ options.getBatchSize(), fileFormat, serialized, -+ context.getMemoryPool().getNativeInstanceId()); - - return new NativeScanner(context, scannerId); - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -new file mode 100644 -index 000000000..8acb2b2d4 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java -@@ -0,0 +1,50 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner; -+ -+import java.nio.ByteBuffer; -+import java.util.Map; -+ -+import org.apache.arrow.dataset.substrait.util.ConvertUtil; -+ -+import io.substrait.proto.Expression; -+ -+public interface FragmentScanOptions { -+ String typeName(); -+ -+ int fileFormatId(); -+ -+ ByteBuffer serialize(); -+ -+ /** -+ * serialize the map. -+ * -+ * @param config config map -+ * @return bufer to jni call argument, should be DirectByteBuffer -+ */ -+ default ByteBuffer serializeMap(Map config) { -+ if (config.isEmpty()) { -+ return null; -+ } -+ -+ Expression.Literal literal = ConvertUtil.mapToExpressionLiteral(config); -+ ByteBuffer buf = ByteBuffer.allocateDirect(literal.getSerializedSize()); -+ buf.put(literal.toByteArray()); -+ return buf; -+ } -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -index 995d05ac3..aad71930c 100644 ---- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java -@@ -31,6 +31,8 @@ public class ScanOptions { - private final Optional substraitProjection; - private final Optional substraitFilter; - -+ private final Optional fragmentScanOptions; -+ - /** - * Constructor. - * @param columns Projected columns. Empty for scanning all columns. -@@ -61,6 +63,7 @@ public class ScanOptions { - this.columns = columns; - this.substraitProjection = Optional.empty(); - this.substraitFilter = Optional.empty(); -+ this.fragmentScanOptions = Optional.empty(); - } - - public ScanOptions(long batchSize) { -@@ -83,6 +86,10 @@ public class ScanOptions { - return substraitFilter; - } - -+ public Optional getFragmentScanOptions() { -+ return fragmentScanOptions; -+ } -+ - /** - * Builder for Options used during scanning. - */ -@@ -91,6 +98,7 @@ public class ScanOptions { - private Optional columns; - private ByteBuffer substraitProjection; - private ByteBuffer substraitFilter; -+ private FragmentScanOptions fragmentScanOptions; - - /** - * Constructor. -@@ -136,6 +144,18 @@ public class ScanOptions { - return this; - } - -+ /** -+ * Set the FragmentScanOptions. -+ * -+ * @param fragmentScanOptions scan options -+ * @return the ScanOptions configured. -+ */ -+ public Builder fragmentScanOptions(FragmentScanOptions fragmentScanOptions) { -+ Preconditions.checkNotNull(fragmentScanOptions); -+ this.fragmentScanOptions = fragmentScanOptions; -+ return this; -+ } -+ - public ScanOptions build() { - return new ScanOptions(this); - } -@@ -146,5 +166,6 @@ public class ScanOptions { - columns = builder.columns; - substraitProjection = Optional.ofNullable(builder.substraitProjection); - substraitFilter = Optional.ofNullable(builder.substraitFilter); -+ fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions); - } - } -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -new file mode 100644 -index 000000000..08e35ede2 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java -@@ -0,0 +1,51 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.util.Map; -+import java.util.Optional; -+ -+import org.apache.arrow.c.ArrowSchema; -+ -+public class CsvConvertOptions { -+ -+ private final Map configs; -+ -+ private Optional cSchema = Optional.empty(); -+ -+ public CsvConvertOptions(Map configs) { -+ this.configs = configs; -+ } -+ -+ public Optional getArrowSchema() { -+ return cSchema; -+ } -+ -+ public Map getConfigs() { -+ return configs; -+ } -+ -+ public void set(String key, String value) { -+ configs.put(key, value); -+ } -+ -+ public void setArrowSchema(ArrowSchema cSchema) { -+ this.cSchema = Optional.of(cSchema); -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -new file mode 100644 -index 000000000..88973f0a0 ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java -@@ -0,0 +1,97 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.scanner.csv; -+ -+import java.io.Serializable; -+import java.nio.ByteBuffer; -+import java.util.Locale; -+import java.util.Map; -+import java.util.stream.Collectors; -+import java.util.stream.Stream; -+ -+import org.apache.arrow.dataset.file.FileFormat; -+import org.apache.arrow.dataset.scanner.FragmentScanOptions; -+ -+public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions { -+ private final CsvConvertOptions convertOptions; -+ private final Map readOptions; -+ private final Map parseOptions; -+ -+ -+ /** -+ * csv scan options, map to CPP struct CsvFragmentScanOptions. -+ * -+ * @param convertOptions same struct in CPP -+ * @param readOptions same struct in CPP -+ * @param parseOptions same struct in CPP -+ */ -+ public CsvFragmentScanOptions(CsvConvertOptions convertOptions, -+ Map readOptions, -+ Map parseOptions) { -+ this.convertOptions = convertOptions; -+ this.readOptions = readOptions; -+ this.parseOptions = parseOptions; -+ } -+ -+ public String typeName() { -+ return FileFormat.CSV.name().toLowerCase(Locale.ROOT); -+ } -+ -+ /** -+ * File format id. -+ * -+ * @return id -+ */ -+ public int fileFormatId() { -+ return FileFormat.CSV.id(); -+ } -+ -+ /** -+ * Serialize this class to ByteBuffer and then called by jni call. -+ * -+ * @return DirectByteBuffer -+ */ -+ public ByteBuffer serialize() { -+ Map options = Stream.concat(Stream.concat(readOptions.entrySet().stream(), -+ parseOptions.entrySet().stream()), -+ convertOptions.getConfigs().entrySet().stream()).collect( -+ Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); -+ -+ if (convertOptions.getArrowSchema().isPresent()) { -+ options.put("column_types", Long.toString(convertOptions.getArrowSchema().get().memoryAddress())); -+ } -+ return serializeMap(options); -+ } -+ -+ public static CsvFragmentScanOptions deserialize(String serialized) { -+ throw new UnsupportedOperationException("Not implemented now"); -+ } -+ -+ public CsvConvertOptions getConvertOptions() { -+ return convertOptions; -+ } -+ -+ public Map getReadOptions() { -+ return readOptions; -+ } -+ -+ public Map getParseOptions() { -+ return parseOptions; -+ } -+ -+} -diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -new file mode 100644 -index 000000000..31a4023af ---- /dev/null -+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/util/ConvertUtil.java -@@ -0,0 +1,46 @@ -+/* -+ * Licensed to the Apache Software Foundation (ASF) under one or more -+ * contributor license agreements. See the NOTICE file distributed with -+ * this work for additional information regarding copyright ownership. -+ * The ASF licenses this file to You under the Apache License, Version 2.0 -+ * (the "License"); you may not use this file except in compliance with -+ * the License. You may obtain a copy of the License at -+ * -+ * http://www.apache.org/licenses/LICENSE-2.0 -+ * -+ * Unless required by applicable law or agreed to in writing, software -+ * distributed under the License is distributed on an "AS IS" BASIS, -+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -+ * See the License for the specific language governing permissions and -+ * limitations under the License. -+ */ -+ -+package org.apache.arrow.dataset.substrait.util; -+ -+import java.util.Map; -+ -+import io.substrait.proto.Expression; -+ -+public class ConvertUtil { -+ -+ /** -+ * Convert map to substrait Expression. -+ * -+ * @return Substrait Expression -+ */ -+ public static Expression.Literal mapToExpressionLiteral(Map values) { -+ Expression.Literal.Builder literalBuilder = Expression.Literal.newBuilder(); -+ Expression.Literal.Map.KeyValue.Builder keyValueBuilder = -+ Expression.Literal.Map.KeyValue.newBuilder(); -+ Expression.Literal.Map.Builder mapBuilder = Expression.Literal.Map.newBuilder(); -+ for (Map.Entry entry : values.entrySet()) { -+ literalBuilder.setString(entry.getKey()); -+ keyValueBuilder.setKey(literalBuilder.build()); -+ literalBuilder.setString(entry.getValue()); -+ keyValueBuilder.setValue(literalBuilder.build()); -+ mapBuilder.addKeyValues(keyValueBuilder.build()); -+ } -+ literalBuilder.setMap(mapBuilder.build()); -+ return literalBuilder.build(); -+ } -+} -diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -index 0fba72892..e7903b7a4 100644 ---- a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java -@@ -31,6 +31,9 @@ import java.util.HashMap; - import java.util.Map; - import java.util.Optional; - -+import org.apache.arrow.c.ArrowSchema; -+import org.apache.arrow.c.CDataDictionaryProvider; -+import org.apache.arrow.c.Data; - import org.apache.arrow.dataset.ParquetWriteSupport; - import org.apache.arrow.dataset.TestDataset; - import org.apache.arrow.dataset.file.FileFormat; -@@ -38,8 +41,11 @@ import org.apache.arrow.dataset.file.FileSystemDatasetFactory; - import org.apache.arrow.dataset.jni.NativeMemoryPool; - import org.apache.arrow.dataset.scanner.ScanOptions; - import org.apache.arrow.dataset.scanner.Scanner; -+import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions; -+import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions; - import org.apache.arrow.dataset.source.Dataset; - import org.apache.arrow.dataset.source.DatasetFactory; -+import org.apache.arrow.memory.BufferAllocator; - import org.apache.arrow.vector.ipc.ArrowReader; - import org.apache.arrow.vector.types.pojo.ArrowType; - import org.apache.arrow.vector.types.pojo.Field; -@@ -49,6 +55,8 @@ import org.junit.ClassRule; - import org.junit.Test; - import org.junit.rules.TemporaryFolder; - -+import com.google.common.collect.ImmutableMap; -+ - public class TestAceroSubstraitConsumer extends TestDataset { - - @ClassRule -@@ -457,4 +465,42 @@ public class TestAceroSubstraitConsumer extends TestDataset { - substraitExpression.put(decodedSubstrait); - return substraitExpression; - } -+ -+ @Test -+ public void testCsvConvertOptions() throws Exception { -+ final Schema schema = new Schema(Arrays.asList( -+ Field.nullable("Id", new ArrowType.Int(32, true)), -+ Field.nullable("Name", new ArrowType.Utf8()), -+ Field.nullable("Language", new ArrowType.Utf8()) -+ ), null); -+ String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; -+ BufferAllocator allocator = rootAllocator(); -+ try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); -+ CDataDictionaryProvider provider = new CDataDictionaryProvider()) { -+ Data.exportSchema(allocator, schema, provider, cSchema); -+ CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of("delimiter", ";")); -+ convertOptions.setArrowSchema(cSchema); -+ CsvFragmentScanOptions fragmentScanOptions = new CsvFragmentScanOptions( -+ convertOptions, ImmutableMap.of(), ImmutableMap.of()); -+ ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768) -+ .columns(Optional.empty()) -+ .fragmentScanOptions(fragmentScanOptions) -+ .build(); -+ try ( -+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), -+ FileFormat.CSV, path); -+ Dataset dataset = datasetFactory.finish(); -+ Scanner scanner = dataset.newScan(options); -+ ArrowReader reader = scanner.scanBatches() -+ ) { -+ assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); -+ int rowCount = 0; -+ while (reader.loadNextBatch()) { -+ assertEquals("[1, 2, 3]", reader.getVectorSchemaRoot().getVector("Id").toString()); -+ rowCount += reader.getVectorSchemaRoot().getRowCount(); -+ } -+ assertEquals(3, rowCount); -+ } -+ } -+ } - } -diff --git a/java/dataset/src/test/resources/data/student.csv b/java/dataset/src/test/resources/data/student.csv -new file mode 100644 -index 000000000..329194609 ---- /dev/null -+++ b/java/dataset/src/test/resources/data/student.csv -@@ -0,0 +1,4 @@ -+Id;Name;Language -+1;Juno;Java -+2;Peter;Python -+3;Celin;C++