Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/jobs/scripts/check_style/check_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ ls -1d $ROOT_PATH/contrib/*-cmake | xargs -I@ find @ -name 'CMakeLists.txt' -or
# Wrong spelling of abbreviations, e.g. SQL is right, Sql is wrong. XMLHttpRequest is very wrong.
find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' |
grep -vP $EXCLUDE |
xargs grep -P 'Sql|Html|Xml|Cpu|Tcp|Udp|Http|Db|Json|Yaml' | grep -v -P 'RabbitMQ|Azure|Aws|aws|Avro|IO/S3|ai::JsonValue|IcebergWrites|arrow::flight|TcpExtListenOverflows' &&
xargs grep -P 'Sql|Html|Xml|Cpu|Tcp|Udp|Http|Db|Json|Yaml' | grep -v -P 'RabbitMQ|Azure|Aws|aws|Avro|IO/S3|ai::JsonValue|IcebergWrites|arrow::flight|SqlInfo|CommandGetSqlInfo|CommandGetDbSchemas|commandGetDbSchemas|ArrowFlightSql|TcpExtListenOverflows' &&
echo "Abbreviations such as SQL, XML, HTTP, should be in all caps. For example, SQL is right, Sql is wrong. XMLHttpRequest is very wrong."

find $ROOT_PATH/{src,base,programs,utils} -name '*.h' -or -name '*.cpp' |
Expand Down
41 changes: 27 additions & 14 deletions contrib/arrow-cmake/flight.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,33 @@ endif()

if(NOT ENABLE_GRPC)
message(${RECONFIGURE_MESSAGE_LEVEL} "Can't use ArrowFlight without gRPC")
return()
endif()

set(GRPC_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/grpc/include)

set(ARROW_FLIGHT_SRC_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/arrow/flight)
set(ARROW_FLIGHT_SQL_SRC_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/arrow/flight/sql)
set(ARROW_FLIGHT_PROTO_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/format)
set(ARROW_FLIGHT_GENERATED_SRC_DIR ${ARROW_GENERATED_SRC_DIR}/arrow/flight)
set(ARROW_FLIGHT_SQL_GENERATED_SRC_DIR ${ARROW_GENERATED_SRC_DIR}/arrow/flight/sql)

set(PROTOBUF_IMPORT_DIRS ${ARROW_FLIGHT_PROTO_DIR} ${ClickHouse_SOURCE_DIR}/contrib/google-protobuf/src)

PROTOBUF_GENERATE_GRPC_CPP(
flight_sources
flight_headers
APPEND_PATH
PROTOC_OUT_DIR ${ARROW_FLIGHT_GENERATED_SRC_DIR}
${ARROW_FLIGHT_PROTO_DIR}/Flight.proto
)

add_custom_command(
OUTPUT
"${ARROW_FLIGHT_GENERATED_SRC_DIR}/Flight.grpc.pb.cc"
"${ARROW_FLIGHT_GENERATED_SRC_DIR}/Flight.grpc.pb.h"
"${ARROW_FLIGHT_GENERATED_SRC_DIR}/Flight.pb.cc"
"${ARROW_FLIGHT_GENERATED_SRC_DIR}/Flight.pb.h"
COMMAND ${PROTOBUF_EXECUTABLE}
-I ${ARROW_FLIGHT_PROTO_DIR}
-I "${ClickHouse_SOURCE_DIR}/contrib/google-protobuf/src"
--cpp_out="${ARROW_FLIGHT_GENERATED_SRC_DIR}"
--grpc_out="${ARROW_FLIGHT_GENERATED_SRC_DIR}"
--plugin=protoc-gen-grpc="${GRPC_EXECUTABLE}"
"${ARROW_FLIGHT_PROTO_DIR}/Flight.proto"
PROTOBUF_GENERATE_GRPC_CPP(
flight_sql_sources
flight_sql_headers
APPEND_PATH
PROTOC_OUT_DIR ${ARROW_FLIGHT_SQL_GENERATED_SRC_DIR}
${ARROW_FLIGHT_PROTO_DIR}/FlightSql.proto
)

# NOTE: we do not compile the ${ARROW_FLIGHT_GENERATED_SRCS} directly, instead
Expand All @@ -37,6 +43,7 @@ add_custom_command(
# protobuf-internal.cc
set(ARROW_FLIGHT_SRCS
${ARROW_FLIGHT_GENERATED_SRC_DIR}/Flight.pb.cc
${ARROW_FLIGHT_SQL_GENERATED_SRC_DIR}/FlightSql.pb.cc
${ARROW_FLIGHT_SRC_DIR}/client.cc
${ARROW_FLIGHT_SRC_DIR}/client_cookie_middleware.cc
${ARROW_FLIGHT_SRC_DIR}/client_tracing_middleware.cc
Expand All @@ -54,6 +61,12 @@ set(ARROW_FLIGHT_SRCS
${ARROW_FLIGHT_SRC_DIR}/transport/grpc/serialization_internal.cc
${ARROW_FLIGHT_SRC_DIR}/transport/grpc/util_internal.cc
${ARROW_FLIGHT_SRC_DIR}/types.cc
${ARROW_FLIGHT_SQL_SRC_DIR}/client.cc
${ARROW_FLIGHT_SQL_SRC_DIR}/column_metadata.cc
${ARROW_FLIGHT_SQL_SRC_DIR}/protocol_internal.cc
${ARROW_FLIGHT_SQL_SRC_DIR}/server_session_middleware.cc
${ARROW_FLIGHT_SQL_SRC_DIR}/server.cc
${ARROW_FLIGHT_SQL_SRC_DIR}/sql_info_internal.cc
)

add_library(_arrow_flight ${ARROW_FLIGHT_SRCS})
Expand All @@ -62,4 +75,4 @@ add_library(ch_contrib::arrow_flight ALIAS _arrow_flight)
add_dependencies(_arrow_flight _protoc grpc_cpp_plugin)
target_link_libraries(_arrow_flight PUBLIC _arrow)
target_link_libraries(_arrow_flight PRIVATE _protobuf grpc++)
target_include_directories(_arrow_flight PRIVATE ${ARROW_GENERATED_SRC_DIR})
target_include_directories(_arrow_flight PUBLIC ${ARROW_GENERATED_SRC_DIR})
13 changes: 10 additions & 3 deletions contrib/grpc-cmake/protobuf_generate_grpc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ protobuf_generate_grpc_cpp(<SRCS> <HDRS>
Variable to define with autogenerated header files
``DESCRIPTORS``
Variable to define with autogenerated descriptor files, if requested.
``PROTOC_OUT_DIR``
Output directory for generated sources
``EXPORT_MACRO``
is a macro which should expand to ``__declspec(dllexport)`` or
``__declspec(dllimport)`` depending on what is being compiled.
Expand All @@ -31,22 +33,27 @@ function(PROTOBUF_GENERATE_GRPC_CPP SRCS HDRS)
set(NATIVE_protoc $<TARGET_FILE:protoc>)
endif()

cmake_parse_arguments(protobuf_generate_grpc_cpp "" "EXPORT_MACRO;DESCRIPTORS" "" ${ARGN})
cmake_parse_arguments(protobuf_generate_grpc_cpp "APPEND_PATH" "EXPORT_MACRO;DESCRIPTORS;PROTOC_OUT_DIR" "" ${ARGN})

set(_proto_files "${protobuf_generate_grpc_cpp_UNPARSED_ARGUMENTS}")
if(NOT _proto_files)
message(SEND_ERROR "Error: PROTOBUF_GENERATE_GRPC_CPP() called without any proto files")
return()
endif()

if(PROTOBUF_GENERATE_GRPC_CPP_APPEND_PATH)
if(protobuf_generate_grpc_cpp_APPEND_PATH)
set(_append_arg APPEND_PATH)
endif()

if(protobuf_generate_grpc_cpp_DESCRIPTORS)
set(_descriptors DESCRIPTORS)
endif()

set(_protoc_out_dir_arg)
if(protobuf_generate_grpc_cpp_PROTOC_OUT_DIR)
set(_protoc_out_dir_arg PROTOC_OUT_DIR ${protobuf_generate_grpc_cpp_PROTOC_OUT_DIR})
endif()

if(DEFINED PROTOBUF_IMPORT_DIRS AND NOT DEFINED Protobuf_IMPORT_DIRS)
set(Protobuf_IMPORT_DIRS "${PROTOBUF_IMPORT_DIRS}")
endif()
Expand All @@ -56,7 +63,7 @@ function(PROTOBUF_GENERATE_GRPC_CPP SRCS HDRS)
endif()

set(_outvar)
protobuf_generate_grpc(${_append_arg} ${_descriptors} LANGUAGE cpp EXPORT_MACRO ${protobuf_generate_cpp_EXPORT_MACRO} OUT_VAR _outvar ${_import_arg} PROTOS ${_proto_files})
protobuf_generate_grpc(${_append_arg} ${_descriptors} LANGUAGE cpp EXPORT_MACRO ${protobuf_generate_grpc_cpp_EXPORT_MACRO} OUT_VAR _outvar ${_import_arg} ${_protoc_out_dir_arg} PROTOS ${_proto_files})

set(${SRCS})
set(${HDRS})
Expand Down
4 changes: 2 additions & 2 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@
#include <Server/ProxyV1HandlerFactory.h>
#include <Server/TLSHandlerFactory.h>
#include <Server/KeeperHTTPHandlerFactory.h>
#include <Server/ArrowFlightHandler.h>
#include <Server/ArrowFlight/ArrowFlightServer.h>
#include <Interpreters/AsynchronousInsertQueue.h>

#include <filesystem>
Expand Down Expand Up @@ -3473,7 +3473,7 @@ void Server::createServers(
listen_host,
port_name,
"Arrow Flight compatibility protocol: " + address.toString(),
std::unique_ptr<IGRPCServer>(new ArrowFlightHandler(*this, makeSocketAddress(listen_host, port, &logger()))),
std::unique_ptr<IGRPCServer>(new ArrowFlightServer(*this, makeSocketAddress(listen_host, port, &logger()))),
true);
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,9 @@ endif()
if (TARGET ch_contrib::ssh)
add_object_library(clickhouse_server_ssh Server/SSH)
endif()
if (TARGET ch_contrib::arrow_flight)
add_object_library(clickhouse_server_arrowflight Server/ArrowFlight)
endif()
add_object_library(clickhouse_server_embedded_client Server/ClientEmbedded)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
Expand Down
6 changes: 6 additions & 0 deletions src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,9 @@ Write enum using parquet physical type: BYTE_ARRAY and logical type: ENUM
)", 0) \
DECLARE(Bool, output_format_parquet_write_checksums, true, R"(
Put crc32 checksums in parquet page headers.
)", 0) \
DECLARE(Bool, output_format_parquet_unsupported_types_as_binary, false, R"(
Output types having no conversion as raw binary data. If false - such types would raise UNKNOWN_TYPE exception.
)", 0) \
DECLARE(String, output_format_avro_codec, "", R"(
Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.
Expand Down Expand Up @@ -1407,6 +1410,9 @@ Compression method for Arrow output format. Supported codecs: lz4_frame, zstd, n
)", 0) \
DECLARE(Bool, output_format_arrow_date_as_uint16, false, R"(
Write Date values as plain 16-bit numbers (read back as UInt16), instead of converting to a 32-bit Arrow DATE32 type (read back as Date32).
)", 0) \
DECLARE(Bool, output_format_arrow_unsupported_types_as_binary, true, R"(
Output types having no conversion as raw binary data. If false - such types would raise UNKNOWN_TYPE exception.
)", 0) \
\
DECLARE(Bool, output_format_orc_string_as_string, true, R"(
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.3.1.20001.altinityantalya",
{
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"output_format_arrow_unsupported_types_as_binary", false, true, "New setting to convert unsupported CH types to arrow binary instead of UNKNOWN_TYPE exception."},
{"output_format_parquet_unsupported_types_as_binary", false, false, "New setting to convert unsupported CH types to parquet (arrow) binary instead of UNKNOWN_TYPE exception."},
});
addSettingsChanges(settings_changes_history, "26.3",
{
Expand Down
73 changes: 73 additions & 0 deletions src/DataTypes/NestedUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
#include <Common/typeid_cast.h>

#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNested.h>

#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnConst.h>

Expand Down Expand Up @@ -121,6 +124,76 @@ std::string extractTableName(const std::string & nested_name)
}


ColumnWithTypeAndName unwrapNullableTuple(const ColumnWithTypeAndName & column)
{
const auto * type_nullable = typeid_cast<const DataTypeNullable *>(column.type.get());
if (!type_nullable)
return column;

const auto * tuple_type = typeid_cast<const DataTypeTuple *>(type_nullable->getNestedType().get());
if (!tuple_type)
return column;

const auto & col_nullable = assert_cast<const ColumnNullable &>(*column.column);

const auto & null_map_data = col_nullable.getNullMapData();
bool has_nulls = !memoryIsZero(null_map_data.data(), 0, null_map_data.size());

if (!has_nulls)
{
/// No actual nulls — just strip the Nullable wrapper.
return {col_nullable.getNestedColumnPtr(), type_nullable->getNestedType(), column.name};
}

/// Propagate the struct null map to each Tuple element.
const auto & inner_tuple = assert_cast<const ColumnTuple &>(col_nullable.getNestedColumn());
const auto & null_map_ptr = col_nullable.getNullMapColumnPtr();
Columns new_elements;
DataTypes new_types;
for (size_t i = 0; i < tuple_type->getElements().size(); ++i)
{
auto elem_col = inner_tuple.getColumnPtr(i);
auto elem_type = tuple_type->getElement(i);
if (elem_type->isNullable())
{
/// Element already Nullable — merge null maps (struct null OR element null).
const auto & existing = assert_cast<const ColumnNullable &>(*elem_col);
auto merged = ColumnUInt8::create(null_map_ptr->size());
const auto & s = assert_cast<const ColumnUInt8 &>(*null_map_ptr).getData();
const auto & e = existing.getNullMapData();
auto & m = merged->getData();
for (size_t j = 0; j < s.size(); ++j)
m[j] = s[j] | e[j];
new_elements.push_back(ColumnNullable::create(existing.getNestedColumnPtr(), std::move(merged)));
new_types.push_back(elem_type);
}
else if (elem_type->canBeInsideNullable())
{
new_elements.push_back(ColumnNullable::create(elem_col, null_map_ptr));
new_types.push_back(std::make_shared<DataTypeNullable>(elem_type));
}
else
{
/// Array, Map, etc. — replace values at null positions with type defaults.
const auto & nm = col_nullable.getNullMapData();
auto mutable_col = elem_col->cloneEmpty();
for (size_t j = 0; j < elem_col->size(); ++j)
{
if (nm[j])
mutable_col->insertDefault();
else
mutable_col->insertFrom(*elem_col, j);
}
new_elements.push_back(std::move(mutable_col));
new_types.push_back(elem_type);
}
}

auto result_type = tuple_type->hasExplicitNames() ? std::make_shared<DataTypeTuple>(std::move(new_types), tuple_type->getElementNames())
: std::make_shared<DataTypeTuple>(std::move(new_types));
return {ColumnTuple::create(std::move(new_elements)), result_type, column.name};
}

static Block flattenImpl(const Block & block, bool flatten_named_tuple)
{
Block res;
Expand Down
7 changes: 7 additions & 0 deletions src/DataTypes/NestedUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ namespace Nested
/// Convert old-style nested (single arrays with same prefix, `n.a`, `n.b`...) to subcolumns of data type Nested.
NamesAndTypesList convertToSubcolumns(const NamesAndTypesList & names_and_types);

/// Unwrap Nullable(Tuple(...)) into Tuple(...) by propagating the struct-level null map
/// to each element. Scalar elements become Nullable(T), already-Nullable elements get merged
/// null maps, and non-nullable-compatible elements (Array, Map) get defaults at null positions.
/// When there are no actual nulls, simply strips the Nullable wrapper.
/// Used by format readers (Arrow, ORC) to convert Nullable struct elements for Nested flattening.
ColumnWithTypeAndName unwrapNullableTuple(const ColumnWithTypeAndName & column);

/// Check that sizes of arrays - elements of nested data structures - are equal.
void validateArraySizes(const Block & block);

Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.parquet.max_dictionary_size = settings[Setting::output_format_parquet_max_dictionary_size];
format_settings.parquet.output_enum_as_byte_array = settings[Setting::output_format_parquet_enum_as_byte_array];
format_settings.parquet.write_checksums = settings[Setting::output_format_parquet_write_checksums];
format_settings.parquet.output_unsupported_types_as_binary = settings[Setting::output_format_parquet_unsupported_types_as_binary];
format_settings.parquet.max_block_size = settings[Setting::input_format_parquet_max_block_size];
format_settings.parquet.prefer_block_bytes = settings[Setting::input_format_parquet_prefer_block_bytes];
format_settings.parquet.output_compression_method = settings[Setting::output_format_parquet_compression_method];
Expand Down Expand Up @@ -313,6 +314,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
format_settings.arrow.output_fixed_string_as_fixed_byte_array = settings[Setting::output_format_arrow_fixed_string_as_fixed_byte_array];
format_settings.arrow.output_compression_method = settings[Setting::output_format_arrow_compression_method];
format_settings.arrow.output_date_as_uint16 = settings[Setting::output_format_arrow_date_as_uint16];
format_settings.arrow.output_unsupported_types_as_binary = settings[Setting::output_format_arrow_unsupported_types_as_binary];
format_settings.orc.allow_missing_columns = settings[Setting::input_format_orc_allow_missing_columns];
format_settings.orc.row_batch_size = settings[Setting::input_format_orc_row_batch_size];
format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference = settings[Setting::input_format_orc_skip_columns_with_unsupported_types_in_schema_inference];
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ struct FormatSettings
bool output_fixed_string_as_fixed_byte_array = true;
ArrowCompression output_compression_method = ArrowCompression::NONE;
bool output_date_as_uint16 = false;
bool output_unsupported_types_as_binary = true;
} arrow{};

struct
Expand Down Expand Up @@ -348,6 +349,7 @@ struct FormatSettings
bool allow_geoparquet_parser = true;
bool write_geometadata = true;
size_t max_dictionary_size = 1024 * 1024;
bool output_unsupported_types_as_binary = false;
} parquet{};

struct Pretty
Expand Down
26 changes: 26 additions & 0 deletions src/Formats/insertNullAsDefaultIfNeeded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,27 @@ bool insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const Col
return true;
}

/// When both input and header are Nullable, unwrap and recurse into the nested types.
/// This can handle cases such as e.g. Nullable(Tuple(Nullable(Int32), String)) vs Nullable(Tuple(UInt32, String))
if (input_column.type->isNullable() && header_column.type->isNullable())
{
ColumnWithTypeAndName nested_input;
nested_input.column = assert_cast<const ColumnNullable *>(input_column.column.get())->getNestedColumnPtr();
nested_input.type = removeNullable(input_column.type);

ColumnWithTypeAndName nested_header;
nested_header.column = assert_cast<const ColumnNullable *>(header_column.column.get())->getNestedColumnPtr();
nested_header.type = removeNullable(header_column.type);

if (!insertNullAsDefaultIfNeeded(nested_input, nested_header, 0, nullptr))
return false;

input_column.column = ColumnNullable::create(
nested_input.column, assert_cast<const ColumnNullable *>(input_column.column.get())->getNullMapColumnPtr());
input_column.type = std::make_shared<DataTypeNullable>(std::move(nested_input.type));
return true;
}

if (!isNullableOrLowCardinalityNullable(input_column.type) || isNullableOrLowCardinalityNullable(header_column.type))
return false;

Expand All @@ -118,6 +139,11 @@ bool insertNullAsDefaultIfNeeded(ColumnWithTypeAndName & input_column, const Col
input_column.type = std::make_shared<DataTypeLowCardinality>(removeNullable(lc_type->getDictionaryType()));
}

/// After stripping the outer Nullable, the inner type may also need processing.
/// For example, Nullable(Tuple(Nullable(Int), String)) -> Tuple(Nullable(Int), String)
/// still needs the Tuple elements compared against the header to strip inner Nullable.
insertNullAsDefaultIfNeeded(input_column, header_column, column_i, block_missing_values);

return true;
}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ArrowBlockOutputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void ArrowBlockOutputFormat::consume(Chunk chunk)
.use_signed_indexes_for_dictionary = format_settings.arrow.use_signed_indexes_for_dictionary,
.use_64_bit_indexes_for_dictionary = format_settings.arrow.use_64_bit_indexes_for_dictionary,
.output_date_as_uint16 = format_settings.arrow.output_date_as_uint16,
.output_unsupported_types_as_binary = format_settings.arrow.output_unsupported_types_as_binary,
});
}

Expand Down
Loading
Loading