Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions be/src/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
_dict_cols_has_converted(false) {
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
VecDateTimeValue t;
t.from_unixtime(0, ctz);
_offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
_meta_cache = meta_cache;
_init_profile();
_init_system_properties();
Expand Down
5 changes: 2 additions & 3 deletions be/src/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,8 @@ class OrcReader : public GenericReader {
}
}

// because the date api argument is int32_t, we should cast to int32_t.
int32_t date_value = cast_set<int32_t>(data->data[i]) + _offset_days;
// ORC DATE stores a logical day count without time zone semantics.
int32_t date_value = cast_set<int32_t>(data->data[i]);
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
v.create_from_date_v2(date_dict[date_value], TIME_DATE);
// we should cast to date if using date v1.
Expand Down Expand Up @@ -655,7 +655,6 @@ class OrcReader : public GenericReader {
int64_t _range_size;
std::string _ctz;

int32_t _offset_days = 0;
cctz::time_zone _time_zone;

// The columns of the table to be read (contain columns that do not exist)
Expand Down
10 changes: 1 addition & 9 deletions be/src/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ struct ConvertParams {
static const cctz::time_zone utc0;
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set local time zone
const cctz::time_zone* ctz = nullptr;
size_t offset_days = 0;
int64_t second_mask = 1;
int64_t scale_to_nano_factor = 1;
const FieldSchema* field_schema = nullptr;
Expand Down Expand Up @@ -110,11 +109,6 @@ struct ConvertParams {
}
}

if (ctz) {
VecDateTimeValue t;
t.from_unixtime(0, *ctz);
offset_days = t.day() == 31 ? -1 : 0;
}
is_type_compatibility = field_schema_->is_type_compatibility;
}
};
Expand Down Expand Up @@ -642,9 +636,7 @@ class Int32ToDate : public PhysicalToLogicalConverter {
date_day_offset_dict& date_dict = date_day_offset_dict::get();

for (int i = 0; i < rows; i++) {
int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days;
data.push_back_without_reserve(
date_dict[cast_set<int32_t>(date_value)].to_date_int_val());
data.push_back_without_reserve(date_dict[src_data[i]].to_date_int_val());
}

return Status::OK();
Expand Down
36 changes: 28 additions & 8 deletions be/test/format/orc/orc_read_lines.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class OrcReadLinesTest : public testing::Test {
OrcReadLinesTest() {}
};

static void read_orc_line(int64_t line, std::string block_dump) {
static void read_orc_line(int64_t line, std::string block_dump,
const std::string& time_zone = "CST") {
auto runtime_state = RuntimeState::create_unique();

std::vector<std::string> column_names = {"col1", "col2", "col3", "col4", "col5",
Expand Down Expand Up @@ -119,7 +120,6 @@ static void read_orc_line(int64_t line, std::string block_dump) {
io::IOContext io_ctx;
io::FileReaderStats file_reader_stats;
io_ctx.file_reader_stats = &file_reader_stats;
std::string time_zone = "CST";
auto reader = OrcReader::create_unique(nullptr, runtime_state.get(), params, range, 100,
time_zone, &io_ctx, nullptr, true);
auto local_fs = io::global_local_filesystem();
Expand All @@ -143,7 +143,8 @@ static void read_orc_line(int64_t line, std::string block_dump) {
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContextSPtr> missing_columns;
static_cast<void>(reader->set_fill_columns(partition_columns, missing_columns));
auto st = reader->set_fill_columns(partition_columns, missing_columns);
EXPECT_TRUE(st.ok()) << st;
BlockUPtr block = Block::create_unique();
for (const auto& slot_desc : tuple_desc->slots()) {
auto data_type = slot_desc->type();
Expand All @@ -158,7 +159,8 @@ static void read_orc_line(int64_t line, std::string block_dump) {

bool eof = false;
size_t read_row = 0;
static_cast<void>(reader->get_next_block(block.get(), &read_row, &eof));
st = reader->get_next_block(block.get(), &read_row, &eof);
EXPECT_TRUE(st.ok()) << st;
auto row_id_string_column = static_cast<const ColumnString&>(
*block->get_by_position(block->get_position_by_name("row_id")).column.get());
for (auto i = 0; i < row_id_string_column.size(); i++) {
Expand All @@ -185,7 +187,7 @@ static void read_orc_line(int64_t line, std::string block_dump) {
slot_info.is_file_slot = true;
params.required_slots.emplace_back(slot_info);
}
runtime_state->_timezone = "CST";
runtime_state->_timezone = time_zone;

std::unique_ptr<RuntimeProfile> runtime_profile;
runtime_profile = std::make_unique<RuntimeProfile>("ExternalRowIDFetcher");
Expand All @@ -196,9 +198,9 @@ static void read_orc_line(int64_t line, std::string block_dump) {
ExternalFileMappingInfo external_info(0, range, false);
int64_t init_reader_ms = 0;
int64_t get_block_ms = 0;
auto st = vf->read_lines_from_range(range, {line}, block.get(), external_info, &init_reader_ms,
&get_block_ms);
EXPECT_TRUE(st.ok());
st = vf->read_lines_from_range(range, {line}, block.get(), external_info, &init_reader_ms,
&get_block_ms);
EXPECT_TRUE(st.ok()) << st;
EXPECT_EQ(block->dump_data(1), block_dump);
}

Expand Down Expand Up @@ -375,4 +377,22 @@ TEST_F(OrcReadLinesTest, test9) {
read_orc_line(9, block_dump);
}

TEST_F(OrcReadLinesTest, date_should_not_shift_in_west_timezone) {
std::string block_dump =
"+----------------------+--------------------+----------------------+------------------"
"----+----------------------+---------------------+-------------------+----------------"
"--------+----------------------+\n|col1(Nullable(BIGINT))|col2(Nullable(BOOL))|col3("
"Nullable(String))|col4(Nullable(DateV2))|col5(Nullable(DOUBLE))|col6(Nullable(FLOAT))|"
"col7(Nullable(INT))|col8(Nullable(SMALLINT))|col9(Nullable(String))|\n+---------------"
"-------+--------------------+----------------------+----------------------+-----------"
"-----------+---------------------+-------------------+------------------------+-------"
"---------------+\n| 1| 1| "
"doris| 1900-01-01| 1.567| 1.567| "
" 12345| 1| "
"doris|\n+----------------------+--------------------+----------------------+----------"
"------------+----------------------+---------------------+-------------------+--------"
"----------------+----------------------+\n";
read_orc_line(1, block_dump, "America/Mexico_City");
}

} // namespace doris
70 changes: 70 additions & 0 deletions be/test/format/parquet/parquet_expr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,69 @@ class ParquetExprTest : public testing::Test {
p_reader->_ctz = &ctz;
}

std::string read_date_column_dump(const std::string& timezone_name) {
TDescriptorTable local_desc_table;
TTableDescriptor local_table_desc;
create_table_desc(local_desc_table, local_table_desc, {"date_col"},
{TPrimitiveType::DATEV2});
DescriptorTbl* local_desc_tbl = nullptr;
ObjectPool local_obj_pool;
static_cast<void>(
DescriptorTbl::create(&local_obj_pool, local_desc_table, &local_desc_tbl));

auto tuple_desc = local_desc_tbl->get_tuple_descriptor(0);
auto slot_descs = tuple_desc->slots();
auto local_fs = io::global_local_filesystem();
io::FileReaderSPtr local_file_reader;
static_cast<void>(local_fs->open_file(file_path, &local_file_reader));

cctz::time_zone local_ctz;
TimezoneUtils::find_cctz_time_zone(timezone_name, local_ctz);

std::vector<std::string> column_names;
std::unordered_map<std::string, uint32_t> col_name_to_block_idx;
for (int i = 0; i < slot_descs.size(); i++) {
column_names.push_back(slot_descs[i]->col_name());
col_name_to_block_idx[slot_descs[i]->col_name()] = i;
}

TFileScanRangeParams scan_params;
TFileRangeDesc scan_range;
scan_range.start_offset = 0;
scan_range.size = local_file_reader->size();

auto local_reader = ParquetReader::create_unique(
nullptr, scan_params, scan_range, scan_range.size, &local_ctz, nullptr, nullptr);
local_reader->set_file_reader(local_file_reader);
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
static_cast<void>(local_reader->init_reader(column_names, &col_name_to_block_idx, {}, tmp,
tuple_desc, nullptr, nullptr, nullptr,
nullptr));

std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContextSPtr> missing_columns;
static_cast<void>(local_reader->set_fill_columns(partition_columns, missing_columns));

bool eof = false;
std::string dump;
while (!eof) {
BlockUPtr block = Block::create_unique();
for (const auto& slot_desc : tuple_desc->slots()) {
auto data_type = make_nullable(slot_desc->type());
MutableColumnPtr data_column = data_type->create_column();
block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
slot_desc->col_name()));
}

size_t read_row = 0;
Status st = local_reader->get_next_block(block.get(), &read_row, &eof);
EXPECT_TRUE(st.ok()) << st;
dump += block->dump_data();
}
return dump;
}

static void create_table_desc(TDescriptorTable& t_desc_table, TTableDescriptor& t_table_desc,
std::vector<std::string> table_column_names,
std::vector<TPrimitiveType::type> types) {
Expand Down Expand Up @@ -400,6 +463,13 @@ TEST_F(ParquetExprTest, test_min_max) {
}
}

TEST_F(ParquetExprTest, date_should_not_shift_in_west_timezone) {
std::string dump = read_date_column_dump("-06:00");
EXPECT_NE(dump.find("2020-01-01"), std::string::npos);
EXPECT_NE(dump.find("2020-01-06"), std::string::npos);
EXPECT_EQ(dump.find("2019-12-31"), std::string::npos);
}

TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 10000000001 [10000000000 , 10000000000+3)
// int64_col = 10000000001 [10000000000 , 10000000000+3)
int loc = 2;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !orc_date_utc --
2023-10-22
2020-01-01
\N
\N
\N
2019-12-31
2022-05-20
\N
2023-01-01
2023-01-01
2023-01-01
2023-01-01

-- !parquet_date_utc --
2023-10-22
2020-01-01
\N
\N
\N
2019-12-31
2022-05-20
\N
2023-01-01
2023-01-01
2023-01-01
2023-01-01

-- !orc_date_west_tz --
2023-10-22
2020-01-01
\N
\N
\N
2019-12-31
2022-05-20
\N
2023-01-01
2023-01-01
2023-01-01
2023-01-01

-- !parquet_date_west_tz --
2023-10-22
2020-01-01
\N
\N
\N
2019-12-31
2022-05-20
\N
2023-01-01
2023-01-01
2023-01-01
2023-01-01
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_hive_date_timezone", "p0,external") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
logger.info("diable Hive test.")
return
}

for (String hivePrefix : ["hive3"]) {
setHivePrefix(hivePrefix)
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String hdfsPort = context.config.otherConfigs.get(hivePrefix + "HdfsPort")
String catalogName = "test_hive_date_timezone_${hivePrefix}"

sql """drop catalog if exists ${catalogName}"""
sql """
create catalog if not exists ${catalogName} properties (
'type'='hms',
'hadoop.username' = 'hadoop',
'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfsPort}',
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}'
);
"""

try {
sql """set enable_fallback_to_original_planner=false"""
sql """switch ${catalogName}"""
sql """use `schema_change`"""

sql """set time_zone = 'UTC'"""
qt_orc_date_utc """select date_col from orc_primitive_types_to_date order by id"""
qt_parquet_date_utc """select date_col from parquet_primitive_types_to_date order by id"""

sql """set time_zone = 'America/Mexico_City'"""
qt_orc_date_west_tz """select date_col from orc_primitive_types_to_date order by id"""
qt_parquet_date_west_tz """select date_col from parquet_primitive_types_to_date order by id"""
} finally {
sql """set time_zone = default"""
sql """switch internal"""
sql """drop catalog if exists ${catalogName}"""
}
}
}
Loading