Skip to content

Commit c34b80a

Browse files
branch-4.1: [fix](hive) Fix Hive DATE timezone shift in external readers #61330 (#61722)
Cherry-pick #61330 to branch-4.1 ### What problem does this PR solve? - Related PR: #61330 Fix Hive external table DATE columns being shifted by one day in west time zones when reading ORC/Parquet files. This backport keeps DATE semantics time-zone-independent for Hive external ORC/Parquet reads and includes the matching unit and regression coverage from the merged master change. ### Cherry-pick commit - `18e5dda9732` - [fix](hive) Fix Hive DATE timezone shift in external readers (#61330)
1 parent 2dabab5 commit c34b80a

7 files changed

Lines changed: 217 additions & 23 deletions

File tree

be/src/format/orc/vorc_reader.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
191191
state == nullptr ? true : state->query_options().enable_orc_filter_by_min_max),
192192
_dict_cols_has_converted(false) {
193193
TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
194-
VecDateTimeValue t;
195-
t.from_unixtime(0, ctz);
196-
_offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
197194
_meta_cache = meta_cache;
198195
_init_profile();
199196
_init_system_properties();

be/src/format/orc/vorc_reader.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -511,8 +511,8 @@ class OrcReader : public GenericReader {
511511
}
512512
}
513513

514-
// because the date api argument is int32_t, we should cast to int32_t.
515-
int32_t date_value = cast_set<int32_t>(data->data[i]) + _offset_days;
514+
// ORC DATE stores a logical day count without time zone semantics.
515+
int32_t date_value = cast_set<int32_t>(data->data[i]);
516516
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
517517
v.create_from_date_v2(date_dict[date_value], TIME_DATE);
518518
// we should cast to date if using date v1.
@@ -655,7 +655,6 @@ class OrcReader : public GenericReader {
655655
int64_t _range_size;
656656
std::string _ctz;
657657

658-
int32_t _offset_days = 0;
659658
cctz::time_zone _time_zone;
660659

661660
// The columns of the table to be read (contain columns that do not exist)

be/src/format/parquet/parquet_column_convert.h

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ struct ConvertParams {
3939
static const cctz::time_zone utc0;
4040
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set local time zone
4141
const cctz::time_zone* ctz = nullptr;
42-
size_t offset_days = 0;
4342
int64_t second_mask = 1;
4443
int64_t scale_to_nano_factor = 1;
4544
const FieldSchema* field_schema = nullptr;
@@ -110,11 +109,6 @@ struct ConvertParams {
110109
}
111110
}
112111

113-
if (ctz) {
114-
VecDateTimeValue t;
115-
t.from_unixtime(0, *ctz);
116-
offset_days = t.day() == 31 ? -1 : 0;
117-
}
118112
is_type_compatibility = field_schema_->is_type_compatibility;
119113
}
120114
};
@@ -642,9 +636,7 @@ class Int32ToDate : public PhysicalToLogicalConverter {
642636
date_day_offset_dict& date_dict = date_day_offset_dict::get();
643637

644638
for (int i = 0; i < rows; i++) {
645-
int64_t date_value = (int64_t)src_data[i] + _convert_params->offset_days;
646-
data.push_back_without_reserve(
647-
date_dict[cast_set<int32_t>(date_value)].to_date_int_val());
639+
data.push_back_without_reserve(date_dict[src_data[i]].to_date_int_val());
648640
}
649641

650642
return Status::OK();

be/test/format/orc/orc_read_lines.cpp

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ class OrcReadLinesTest : public testing::Test {
5757
OrcReadLinesTest() {}
5858
};
5959

60-
static void read_orc_line(int64_t line, std::string block_dump) {
60+
static void read_orc_line(int64_t line, std::string block_dump,
61+
const std::string& time_zone = "CST") {
6162
auto runtime_state = RuntimeState::create_unique();
6263

6364
std::vector<std::string> column_names = {"col1", "col2", "col3", "col4", "col5",
@@ -119,7 +120,6 @@ static void read_orc_line(int64_t line, std::string block_dump) {
119120
io::IOContext io_ctx;
120121
io::FileReaderStats file_reader_stats;
121122
io_ctx.file_reader_stats = &file_reader_stats;
122-
std::string time_zone = "CST";
123123
auto reader = OrcReader::create_unique(nullptr, runtime_state.get(), params, range, 100,
124124
time_zone, &io_ctx, nullptr, true);
125125
auto local_fs = io::global_local_filesystem();
@@ -143,7 +143,8 @@ static void read_orc_line(int64_t line, std::string block_dump) {
143143
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
144144
partition_columns;
145145
std::unordered_map<std::string, VExprContextSPtr> missing_columns;
146-
static_cast<void>(reader->set_fill_columns(partition_columns, missing_columns));
146+
auto st = reader->set_fill_columns(partition_columns, missing_columns);
147+
EXPECT_TRUE(st.ok()) << st;
147148
BlockUPtr block = Block::create_unique();
148149
for (const auto& slot_desc : tuple_desc->slots()) {
149150
auto data_type = slot_desc->type();
@@ -158,7 +159,8 @@ static void read_orc_line(int64_t line, std::string block_dump) {
158159

159160
bool eof = false;
160161
size_t read_row = 0;
161-
static_cast<void>(reader->get_next_block(block.get(), &read_row, &eof));
162+
st = reader->get_next_block(block.get(), &read_row, &eof);
163+
EXPECT_TRUE(st.ok()) << st;
162164
auto row_id_string_column = static_cast<const ColumnString&>(
163165
*block->get_by_position(block->get_position_by_name("row_id")).column.get());
164166
for (auto i = 0; i < row_id_string_column.size(); i++) {
@@ -185,7 +187,7 @@ static void read_orc_line(int64_t line, std::string block_dump) {
185187
slot_info.is_file_slot = true;
186188
params.required_slots.emplace_back(slot_info);
187189
}
188-
runtime_state->_timezone = "CST";
190+
runtime_state->_timezone = time_zone;
189191

190192
std::unique_ptr<RuntimeProfile> runtime_profile;
191193
runtime_profile = std::make_unique<RuntimeProfile>("ExternalRowIDFetcher");
@@ -196,9 +198,9 @@ static void read_orc_line(int64_t line, std::string block_dump) {
196198
ExternalFileMappingInfo external_info(0, range, false);
197199
int64_t init_reader_ms = 0;
198200
int64_t get_block_ms = 0;
199-
auto st = vf->read_lines_from_range(range, {line}, block.get(), external_info, &init_reader_ms,
200-
&get_block_ms);
201-
EXPECT_TRUE(st.ok());
201+
st = vf->read_lines_from_range(range, {line}, block.get(), external_info, &init_reader_ms,
202+
&get_block_ms);
203+
EXPECT_TRUE(st.ok()) << st;
202204
EXPECT_EQ(block->dump_data(1), block_dump);
203205
}
204206

@@ -375,4 +377,22 @@ TEST_F(OrcReadLinesTest, test9) {
375377
read_orc_line(9, block_dump);
376378
}
377379

380+
TEST_F(OrcReadLinesTest, date_should_not_shift_in_west_timezone) {
381+
std::string block_dump =
382+
"+----------------------+--------------------+----------------------+------------------"
383+
"----+----------------------+---------------------+-------------------+----------------"
384+
"--------+----------------------+\n|col1(Nullable(BIGINT))|col2(Nullable(BOOL))|col3("
385+
"Nullable(String))|col4(Nullable(DateV2))|col5(Nullable(DOUBLE))|col6(Nullable(FLOAT))|"
386+
"col7(Nullable(INT))|col8(Nullable(SMALLINT))|col9(Nullable(String))|\n+---------------"
387+
"-------+--------------------+----------------------+----------------------+-----------"
388+
"-----------+---------------------+-------------------+------------------------+-------"
389+
"---------------+\n| 1| 1| "
390+
"doris| 1900-01-01| 1.567| 1.567| "
391+
" 12345| 1| "
392+
"doris|\n+----------------------+--------------------+----------------------+----------"
393+
"------------+----------------------+---------------------+-------------------+--------"
394+
"----------------+----------------------+\n";
395+
read_orc_line(1, block_dump, "America/Mexico_City");
396+
}
397+
378398
} // namespace doris

be/test/format/parquet/parquet_expr_test.cpp

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,69 @@ class ParquetExprTest : public testing::Test {
292292
p_reader->_ctz = &ctz;
293293
}
294294

295+
std::string read_date_column_dump(const std::string& timezone_name) {
296+
TDescriptorTable local_desc_table;
297+
TTableDescriptor local_table_desc;
298+
create_table_desc(local_desc_table, local_table_desc, {"date_col"},
299+
{TPrimitiveType::DATEV2});
300+
DescriptorTbl* local_desc_tbl = nullptr;
301+
ObjectPool local_obj_pool;
302+
static_cast<void>(
303+
DescriptorTbl::create(&local_obj_pool, local_desc_table, &local_desc_tbl));
304+
305+
auto tuple_desc = local_desc_tbl->get_tuple_descriptor(0);
306+
auto slot_descs = tuple_desc->slots();
307+
auto local_fs = io::global_local_filesystem();
308+
io::FileReaderSPtr local_file_reader;
309+
static_cast<void>(local_fs->open_file(file_path, &local_file_reader));
310+
311+
cctz::time_zone local_ctz;
312+
TimezoneUtils::find_cctz_time_zone(timezone_name, local_ctz);
313+
314+
std::vector<std::string> column_names;
315+
std::unordered_map<std::string, uint32_t> col_name_to_block_idx;
316+
for (int i = 0; i < slot_descs.size(); i++) {
317+
column_names.push_back(slot_descs[i]->col_name());
318+
col_name_to_block_idx[slot_descs[i]->col_name()] = i;
319+
}
320+
321+
TFileScanRangeParams scan_params;
322+
TFileRangeDesc scan_range;
323+
scan_range.start_offset = 0;
324+
scan_range.size = local_file_reader->size();
325+
326+
auto local_reader = ParquetReader::create_unique(
327+
nullptr, scan_params, scan_range, scan_range.size, &local_ctz, nullptr, nullptr);
328+
local_reader->set_file_reader(local_file_reader);
329+
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> tmp;
330+
static_cast<void>(local_reader->init_reader(column_names, &col_name_to_block_idx, {}, tmp,
331+
tuple_desc, nullptr, nullptr, nullptr,
332+
nullptr));
333+
334+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
335+
partition_columns;
336+
std::unordered_map<std::string, VExprContextSPtr> missing_columns;
337+
static_cast<void>(local_reader->set_fill_columns(partition_columns, missing_columns));
338+
339+
bool eof = false;
340+
std::string dump;
341+
while (!eof) {
342+
BlockUPtr block = Block::create_unique();
343+
for (const auto& slot_desc : tuple_desc->slots()) {
344+
auto data_type = make_nullable(slot_desc->type());
345+
MutableColumnPtr data_column = data_type->create_column();
346+
block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
347+
slot_desc->col_name()));
348+
}
349+
350+
size_t read_row = 0;
351+
Status st = local_reader->get_next_block(block.get(), &read_row, &eof);
352+
EXPECT_TRUE(st.ok()) << st;
353+
dump += block->dump_data();
354+
}
355+
return dump;
356+
}
357+
295358
static void create_table_desc(TDescriptorTable& t_desc_table, TTableDescriptor& t_table_desc,
296359
std::vector<std::string> table_column_names,
297360
std::vector<TPrimitiveType::type> types) {
@@ -400,6 +463,13 @@ TEST_F(ParquetExprTest, test_min_max) {
400463
}
401464
}
402465

466+
TEST_F(ParquetExprTest, date_should_not_shift_in_west_timezone) {
467+
std::string dump = read_date_column_dump("-06:00");
468+
EXPECT_NE(dump.find("2020-01-01"), std::string::npos);
469+
EXPECT_NE(dump.find("2020-01-06"), std::string::npos);
470+
EXPECT_EQ(dump.find("2019-12-31"), std::string::npos);
471+
}
472+
403473
TEST_F(ParquetExprTest, test_ge_2) { // int64_col = 10000000001 [10000000000 , 10000000000+3)
404474
// int64_col = 10000000001 [10000000000 , 10000000000+3)
405475
int loc = 2;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !orc_date_utc --
3+
2023-10-22
4+
2020-01-01
5+
\N
6+
\N
7+
\N
8+
2019-12-31
9+
2022-05-20
10+
\N
11+
2023-01-01
12+
2023-01-01
13+
2023-01-01
14+
2023-01-01
15+
16+
-- !parquet_date_utc --
17+
2023-10-22
18+
2020-01-01
19+
\N
20+
\N
21+
\N
22+
2019-12-31
23+
2022-05-20
24+
\N
25+
2023-01-01
26+
2023-01-01
27+
2023-01-01
28+
2023-01-01
29+
30+
-- !orc_date_west_tz --
31+
2023-10-22
32+
2020-01-01
33+
\N
34+
\N
35+
\N
36+
2019-12-31
37+
2022-05-20
38+
\N
39+
2023-01-01
40+
2023-01-01
41+
2023-01-01
42+
2023-01-01
43+
44+
-- !parquet_date_west_tz --
45+
2023-10-22
46+
2020-01-01
47+
\N
48+
\N
49+
\N
50+
2019-12-31
51+
2022-05-20
52+
\N
53+
2023-01-01
54+
2023-01-01
55+
2023-01-01
56+
2023-01-01
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
suite("test_hive_date_timezone", "p0,external") {
19+
String enabled = context.config.otherConfigs.get("enableHiveTest")
20+
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
21+
logger.info("diable Hive test.")
22+
return
23+
}
24+
25+
for (String hivePrefix : ["hive3"]) {
26+
setHivePrefix(hivePrefix)
27+
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
28+
String hmsPort = context.config.otherConfigs.get(hivePrefix + "HmsPort")
29+
String hdfsPort = context.config.otherConfigs.get(hivePrefix + "HdfsPort")
30+
String catalogName = "test_hive_date_timezone_${hivePrefix}"
31+
32+
sql """drop catalog if exists ${catalogName}"""
33+
sql """
34+
create catalog if not exists ${catalogName} properties (
35+
'type'='hms',
36+
'hadoop.username' = 'hadoop',
37+
'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfsPort}',
38+
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hmsPort}'
39+
);
40+
"""
41+
42+
try {
43+
sql """set enable_fallback_to_original_planner=false"""
44+
sql """switch ${catalogName}"""
45+
sql """use `schema_change`"""
46+
47+
sql """set time_zone = 'UTC'"""
48+
qt_orc_date_utc """select date_col from orc_primitive_types_to_date order by id"""
49+
qt_parquet_date_utc """select date_col from parquet_primitive_types_to_date order by id"""
50+
51+
sql """set time_zone = 'America/Mexico_City'"""
52+
qt_orc_date_west_tz """select date_col from orc_primitive_types_to_date order by id"""
53+
qt_parquet_date_west_tz """select date_col from parquet_primitive_types_to_date order by id"""
54+
} finally {
55+
sql """set time_zone = default"""
56+
sql """switch internal"""
57+
sql """drop catalog if exists ${catalogName}"""
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)