Skip to content

Commit 024432f

Browse files
committed
test
1 parent 7694dad commit 024432f

3 files changed

Lines changed: 17 additions & 2 deletions

File tree

be/src/common/config.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1680,6 +1680,10 @@ DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
16801680
// The maximum csv line reader output buffer size
16811681
DEFINE_mInt64(max_csv_line_reader_output_buffer_size, "4294967296");
16821682

1683+
// The maximum bytes of a single block returned by CsvReader::get_next_block.
1684+
// Default is 200MB. Set to 0 to disable the limit.
1685+
DEFINE_mInt64(csv_reader_max_block_bytes, "209715200");
1686+
16831687
// Maximum number of OpenMP threads allowed for concurrent vector index builds.
16841688
// -1 means auto: use 80% of the available CPU cores.
16851689
DEFINE_Int32(omp_threads_limit, "-1");

be/src/common/config.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1770,6 +1770,10 @@ DECLARE_String(fuzzy_test_type);
17701770
// The maximum csv line reader output buffer size
17711771
DECLARE_mInt64(max_csv_line_reader_output_buffer_size);
17721772

1773+
// The maximum bytes of a single block returned by CsvReader::get_next_block.
1774+
// Default is 200MB. Set to 0 to disable the limit.
1775+
DECLARE_mInt64(csv_reader_max_block_bytes);
1776+
17731777
// Maximum number of OpenMP threads available for concurrent index builds.
17741778
// -1 means auto: use 80% of detected CPU cores.
17751779
DECLARE_Int32(omp_threads_limit);

be/src/format/csv/csv_reader.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <utility>
3232

3333
#include "common/compiler_util.h" // IWYU pragma: keep
34+
#include "common/config.h"
3435
#include "common/consts.h"
3536
#include "common/status.h"
3637
#include "core/block/block.h"
@@ -312,12 +313,15 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
312313
}
313314

314315
const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
316+
const int64_t max_block_bytes = config::csv_reader_max_block_bytes;
315317
size_t rows = 0;
318+
size_t block_bytes = 0;
316319

317320
bool success = false;
318321
bool is_remove_bom = false;
319322
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
320-
while (rows < batch_size && !_line_reader_eof) {
323+
while (rows < batch_size && !_line_reader_eof &&
324+
(max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) {
321325
const uint8_t* ptr = nullptr;
322326
size_t size = 0;
323327
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
@@ -345,6 +349,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
345349

346350
RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
347351
++rows;
352+
block_bytes += size;
348353
}
349354
auto mutate_columns = block->mutate_columns();
350355
for (auto& col : mutate_columns) {
@@ -353,7 +358,8 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
353358
block->set_columns(std::move(mutate_columns));
354359
} else {
355360
auto columns = block->mutate_columns();
356-
while (rows < batch_size && !_line_reader_eof) {
361+
while (rows < batch_size && !_line_reader_eof &&
362+
(max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) {
357363
const uint8_t* ptr = nullptr;
358364
size_t size = 0;
359365
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
@@ -384,6 +390,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
384390
continue;
385391
}
386392
RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
393+
block_bytes += size;
387394
}
388395
block->set_columns(std::move(columns));
389396
}

0 commit comments

Comments
 (0)