-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Expand file tree
/
Copy pathcloud_rowset_writer.cpp
More file actions
226 lines (200 loc) · 11 KB
/
cloud_rowset_writer.cpp
File metadata and controls
226 lines (200 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "cloud/cloud_rowset_writer.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/fs/packed_file_manager.h"
#include "io/fs/packed_file_writer.h"
#include "storage/rowset/rowset_factory.h"
namespace doris {
CloudRowsetWriter::CloudRowsetWriter(CloudStorageEngine& engine) : _engine(engine) {}
CloudRowsetWriter::~CloudRowsetWriter() {
// Must cancel any pending delete bitmap tasks before destruction.
// Otherwise, the lambda in _generate_delete_bitmap may execute after the
// CloudRowsetWriter destructor runs but before BaseBetaRowsetWriter destructor,
// causing virtual function calls to resolve to BaseBetaRowsetWriter::_build_rowset_meta
// instead of CloudRowsetWriter::_build_rowset_meta (use-after-free on vtable).
if (_calc_delete_bitmap_token != nullptr) {
_calc_delete_bitmap_token->cancel();
}
}
Status CloudRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) {
_context = rowset_writer_context;
_rowset_meta = std::make_shared<RowsetMeta>();
if (_context.is_local_rowset()) {
// In cloud mode, this branch implies it is an intermediate rowset for external merge sort,
// we use `global_local_filesystem` to write data to `tmp_file_dir`(see `local_segment_path`).
_context.tablet_path = io::FileCacheFactory::instance()->pick_one_cache_path();
} else {
_rowset_meta->set_remote_storage_resource(*_context.storage_resource);
}
_rowset_meta->set_rowset_id(_context.rowset_id);
_rowset_meta->set_partition_id(_context.partition_id);
_rowset_meta->set_tablet_id(_context.tablet_id);
_rowset_meta->set_index_id(_context.index_id);
_rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash);
_rowset_meta->set_rowset_type(_context.rowset_type);
_rowset_meta->set_rowset_state(_context.rowset_state);
_rowset_meta->set_segments_overlap(_context.segments_overlap);
_rowset_meta->set_txn_id(_context.txn_id);
_rowset_meta->set_txn_expiration(_context.txn_expiration);
_rowset_meta->set_compaction_level(_context.compaction_level);
if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) {
_is_pending = true;
_rowset_meta->set_load_id(_context.load_id);
} else {
// Rowset generated by compaction or schema change
_rowset_meta->set_version(_context.version);
DCHECK_NE(_context.newest_write_timestamp, -1);
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
}
_rowset_meta->set_tablet_schema(_context.tablet_schema);
_rowset_meta->set_job_id(_context.job_id);
_context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
_context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
if (_context.mow_context != nullptr) {
_calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token();
}
return Status::OK();
}
Status CloudRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) {
VLOG_NOTICE << "Start building rowset meta. tablet_id=" << rowset_meta->tablet_id()
<< ", rowset_id=" << rowset_meta->rowset_id()
<< ", check_segment_num=" << check_segment_num;
// Call base class implementation
RETURN_IF_ERROR(BaseBetaRowsetWriter::_build_rowset_meta(rowset_meta, check_segment_num));
// Collect packed file segment index information for interim rowsets as well.
return _collect_all_packed_slice_locations(rowset_meta);
}
Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
if (_calc_delete_bitmap_token != nullptr) {
RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
}
RETURN_IF_ERROR(_close_file_writers());
// TODO(plat1ko): check_segment_footer
RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get()));
// At this point all writers have been closed, so collecting packed file indices is safe.
RETURN_IF_ERROR(_collect_all_packed_slice_locations(_rowset_meta.get()));
// If the current load is a partial update, new segments may be appended to the tmp rowset after the tmp rowset
// has been committed if conflicts occur due to concurrent partial updates. However, when the recycler do recycling,
// it will generate the paths for the segments to be recycled on the object storage based on the number of segments
// in the rowset meta. If these newly added segments are written to the object storage and the transaction is aborted
// due to a failure before successfully updating the rowset meta of the corresponding tmp rowset, these newly added
// segments cannot be recycled by the recycler on the object storage. Therefore, we need a new state `BEGIN_PARTIAL_UPDATE`
// to indicate that the recycler should use list+delete to recycle segments. After the tmp rowset's rowset meta being
// updated successfully, the `rowset_state` will be set to `COMMITTED` and the recycler can do recycling based on the
// number of segments in the rowset meta safely.
//
// rowset_state's FSM:
//
// transfer 0
// PREPARED ---------------------------> COMMITTED
// | ^
// | transfer 1 |
// | | transfer 2
// |--> BEGIN_PARTIAL_UPDATE ------|
//
// transfer 0 (PREPARED -> COMMITTED): finish writing a rowset and the rowset' meta will not be changed
// transfer 1 (PREPARED -> BEGIN_PARTIAL_UPDATE): finish writing a rowset, but may append new segments later and the rowset's meta may be changed
// transfer 2 (BEGIN_PARTIAL_UPDATE -> VISIBLE): finish adding new segments and the rowset' meta will not be changed, the rowset is visible to users
if (_context.partial_update_info && _context.partial_update_info->is_partial_update()) {
_rowset_meta->set_rowset_state(BEGIN_PARTIAL_UPDATE);
} else {
_rowset_meta->set_rowset_state(COMMITTED);
}
_rowset_meta->set_tablet_schema(_context.tablet_schema);
if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}
if (auto seg_file_size = _seg_files.segments_file_size(_segment_start_id);
!seg_file_size.has_value()) [[unlikely]] {
LOG(ERROR) << "expected segment file sizes, but none presents: " << seg_file_size.error();
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}
if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) {
if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none presents: "
<< idx_files_info.error();
} else {
_rowset_meta->add_inverted_index_files_info(idx_files_info.value());
}
}
RETURN_NOT_OK_STATUS_WITH_WARN(
RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta,
&rowset),
"rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
}
Status CloudRowsetWriter::_collect_all_packed_slice_locations(RowsetMeta* rowset_meta) {
VLOG_NOTICE << "Start collecting packed slice locations for rowset meta. tablet_id="
<< rowset_meta->tablet_id() << ", rowset_id=" << rowset_meta->rowset_id();
if (!_context.packed_file_active) {
return Status::OK();
}
// Collect segment file packed indices
const auto& file_writers = _seg_files.get_file_writers();
for (const auto& [seg_id, writer_ptr] : file_writers) {
auto segment_path = _context.segment_path(seg_id);
RETURN_IF_ERROR(
_collect_packed_slice_location(writer_ptr.get(), segment_path, rowset_meta));
}
// Collect inverted index file packed indices
const auto& idx_file_writers = _idx_files.get_file_writers();
for (const auto& [seg_id, idx_writer_ptr] : idx_file_writers) {
if (idx_writer_ptr != nullptr && idx_writer_ptr->get_file_writer() != nullptr) {
auto segment_path = _context.segment_path(seg_id);
auto index_prefix_view =
InvertedIndexDescriptor::get_index_file_path_prefix(segment_path);
std::string index_path =
InvertedIndexDescriptor::get_index_file_path_v2(std::string(index_prefix_view));
RETURN_IF_ERROR(_collect_packed_slice_location(idx_writer_ptr->get_file_writer(),
index_path, rowset_meta));
}
}
return Status::OK();
}
Status CloudRowsetWriter::_collect_packed_slice_location(io::FileWriter* file_writer,
const std::string& file_path,
RowsetMeta* rowset_meta) {
VLOG_NOTICE << "collect packed slice location for file: " << file_path;
// Check if file writer is closed
if (file_writer->state() != io::FileWriter::State::CLOSED) {
// Writer is still open; index will be collected after it is closed.
return Status::OK();
}
// Check if file is actually in packed file (not direct write for large files)
if (!file_writer->is_in_packed_file()) {
return Status::OK();
}
// Get packed slice location directly from PackedFileManager
io::PackedSliceLocation index;
RETURN_IF_ERROR(
io::PackedFileManager::instance()->get_packed_slice_location(file_path, &index));
if (index.packed_file_path.empty()) {
return Status::OK(); // File not in packed file, skip
}
rowset_meta->add_packed_slice_location(file_path, index.packed_file_path, index.offset,
index.size, index.packed_file_size);
LOG(INFO) << "collect packed file index: " << file_path << " -> " << index.packed_file_path
<< ", offset: " << index.offset << ", size: " << index.size;
return Status::OK();
}
} // namespace doris