Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cpp/src/common/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ typedef struct ConfigValue {
TSEncoding double_encoding_type_;
TSEncoding string_encoding_type_;
CompressionType default_compression_type_;
// When true, aligned writer enforces page size limit strictly by
// interleaving time/value writes and sealing pages together when any side
// becomes full.
// When false, aligned writer may disable some page-size checks to improve
// write performance.
bool strict_page_size_ = true;
} ConfigValue;

extern void init_config_value();
Expand All @@ -57,6 +63,7 @@ extern void set_config_value();
extern void config_set_page_max_point_count(uint32_t page_max_point_count);
extern void config_set_max_degree_of_index_node(
uint32_t max_degree_of_index_node);
extern void config_set_strict_page_size(bool strict_page_size);

} // namespace common

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ void init_config_value() {
#else
g_config_value_.default_compression_type_ = UNCOMPRESSED;
#endif
// Enforce aligned page size limits strictly by default.
g_config_value_.strict_page_size_ = true;
}

extern TSEncoding get_value_encoder(TSDataType data_type) {
Expand Down Expand Up @@ -104,6 +106,10 @@ void config_set_max_degree_of_index_node(uint32_t max_degree_of_index_node) {
g_config_value_.max_degree_of_index_node_ = max_degree_of_index_node;
}

void config_set_strict_page_size(bool strict_page_size) {
g_config_value_.strict_page_size_ = strict_page_size;
}

void set_config_value() {}
const char* s_data_type_names[8] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
"DOUBLE", "TEXT", "VECTOR", "STRING"};
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
row_appender.append_null(1); \
continue; \
} \
assert(value_decoder_->has_remaining(value_in)); \
if (!value_decoder_->has_remaining(value_in)) { \
return common::E_DATA_INCONSISTENCY; \
} \
Expand Down Expand Up @@ -609,7 +608,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
row_appender.append_null(1);
continue;
}
assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return common::E_DATA_INCONSISTENCY;
}
Expand Down Expand Up @@ -695,7 +693,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
}

if (should_read_data) {
assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return E_DATA_INCONSISTENCY;
}
Expand Down
19 changes: 13 additions & 6 deletions cpp/src/reader/qds_without_timegenerator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,14 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {

uint32_t len = 0;
uint32_t idx = heap_time_.begin()->second;
bool is_null_val = false;
auto val_datatype = value_iters_[idx]->get_data_type();
void* val_ptr = value_iters_[idx]->read(&len);
void* val_ptr = value_iters_[idx]->read(&len, &is_null_val);
if (!skip_row) {
row_record_->get_field(idx + 1)->set_value(val_datatype,
val_ptr, len, pa_);
if (!is_null_val) {
row_record_->get_field(idx + 1)->set_value(
val_datatype, val_ptr, len, pa_);
}
}
value_iters_[idx]->next();

Expand Down Expand Up @@ -219,10 +222,14 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {
std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time);
for (uint32_t i = 0; i < count; ++i) {
uint32_t len = 0;
bool is_null_val = false;
auto val_datatype = value_iters_[iter->second]->get_data_type();
void* val_ptr = value_iters_[iter->second]->read(&len);
row_record_->get_field(iter->second + 1)
->set_value(val_datatype, val_ptr, len, pa_);
void* val_ptr =
value_iters_[iter->second]->read(&len, &is_null_val);
if (!is_null_val) {
row_record_->get_field(iter->second + 1)
->set_value(val_datatype, val_ptr, len, pa_);
}
value_iters_[iter->second]->next();
if (!time_iters_[iter->second]->end()) {
int64_t timev =
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/writer/time_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ int TimeChunkWriter::end_encode_chunk() {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
} else if (num_of_pages_ > 0) {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
#if DEBUG_SE
std::cout << "end_encode_time_chunk: num_of_pages_=" << num_of_pages_
Expand Down
34 changes: 32 additions & 2 deletions cpp/src/writer/time_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class TimeChunkWriter {
first_page_data_(),
first_page_statistic_(nullptr),
chunk_header_(),
num_of_pages_(0) {}
num_of_pages_(0),
enable_page_seal_if_full_(true) {}
~TimeChunkWriter() { destroy(); }
int init(const common::ColumnSchema& col_schema);
int init(const std::string& measurement_name, common::TSEncoding encoding,
Expand All @@ -57,8 +58,12 @@ class TimeChunkWriter {
if (RET_FAIL(time_page_writer_.write(timestamp))) {
return ret;
}
if (RET_FAIL(seal_cur_page_if_full())) {
if (UNLIKELY(!enable_page_seal_if_full_)) {
return ret;
} else {
if (RET_FAIL(seal_cur_page_if_full())) {
return ret;
}
}
return ret;
}
Expand All @@ -68,10 +73,33 @@ class TimeChunkWriter {
Statistic* get_chunk_statistic() { return chunk_statistic_; }
FORCE_INLINE int32_t num_of_pages() const { return num_of_pages_; }

// Current (unsealed) page point count.
FORCE_INLINE uint32_t get_point_numer() const {
return time_page_writer_.get_point_numer();
}

int64_t estimate_max_series_mem_size();

bool hasData();

/** True if the current (unsealed) page has at least one point. */
bool has_current_page_data() const {
return time_page_writer_.get_point_numer() > 0;
}

/**
* Force seal the current page (for aligned model: when any aligned page
* seals due to memory/point threshold, all pages must seal together).
* @return E_OK on success.
*/
int seal_current_page() { return seal_cur_page(false); }

// For aligned writer: allow disabling the automatic page-size/point-number
// check so the caller can seal pages at chosen boundaries.
FORCE_INLINE void set_enable_page_seal_if_full(bool enable) {
enable_page_seal_if_full_ = enable;
}

private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
Expand Down Expand Up @@ -110,6 +138,8 @@ class TimeChunkWriter {

ChunkHeader chunk_header_;
int32_t num_of_pages_;
// If false, write() won't auto-seal when the current page becomes full.
bool enable_page_seal_if_full_;
};

} // end namespace storage
Expand Down
Loading
Loading