Skip to content

Commit 72817de

Browse files
committed
implement streaming read/write type info
1 parent 6ac8093 commit 72817de

29 files changed

Lines changed: 521 additions & 828 deletions

File tree

cpp/fory/serialization/context.cc

Lines changed: 111 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -64,46 +64,33 @@ WriteContext::WriteContext(const Config &config,
6464

6565
WriteContext::~WriteContext() = default;
6666

67-
Result<size_t, Error> WriteContext::push_meta(const std::type_index &type_id) {
68-
auto it = write_type_id_index_map_.find(type_id);
69-
if (it != write_type_id_index_map_.end()) {
70-
return it->second;
71-
}
72-
73-
size_t index = write_type_defs_.size();
67+
Result<void, Error>
68+
WriteContext::write_type_meta(const std::type_index &type_id) {
69+
// Resolve type_index to TypeInfo* and delegate to the TypeInfo* version
70+
// This ensures consistent indexing when the same type is written via
71+
// either type_index or TypeInfo* path
7472
FORY_TRY(type_info, type_resolver_->get_type_info(type_id));
75-
write_type_defs_.push_back(type_info->type_def);
76-
write_type_id_index_map_[type_id] = index;
77-
return index;
73+
write_type_meta(type_info);
74+
return Result<void, Error>();
7875
}
7976

80-
size_t WriteContext::push_meta(const TypeInfo *type_info) {
77+
void WriteContext::write_type_meta(const TypeInfo *type_info) {
8178
auto it = write_type_info_index_map_.find(type_info);
8279
if (it != write_type_info_index_map_.end()) {
83-
return it->second;
80+
// Reference to previously written type: (index << 1) | 1, LSB=1
81+
buffer_.WriteVarUint32(static_cast<uint32_t>((it->second << 1) | 1));
82+
return;
8483
}
8584

86-
size_t index = write_type_defs_.size();
87-
write_type_defs_.push_back(type_info->type_def);
85+
// New type: index << 1, LSB=0, followed by TypeDef bytes inline
86+
size_t index = write_type_info_index_map_.size();
87+
buffer_.WriteVarUint32(static_cast<uint32_t>(index << 1));
8888
write_type_info_index_map_[type_info] = index;
89-
return index;
90-
}
9189

92-
void WriteContext::write_meta(size_t offset) {
93-
size_t current_pos = buffer_.writer_index();
94-
// Update the meta offset field (written as -1 initially)
95-
int32_t meta_size = static_cast<int32_t>(current_pos - offset - 4);
96-
buffer_.UnsafePut<int32_t>(offset, meta_size);
97-
// Write all collected TypeMetas
98-
buffer_.WriteVarUint32(static_cast<uint32_t>(write_type_defs_.size()));
99-
for (size_t i = 0; i < write_type_defs_.size(); ++i) {
100-
const auto &type_def = write_type_defs_[i];
101-
buffer_.WriteBytes(type_def.data(), type_def.size());
102-
}
90+
// Write TypeDef bytes inline
91+
buffer_.WriteBytes(type_info->type_def.data(), type_info->type_def.size());
10392
}
10493

105-
bool WriteContext::meta_empty() const { return write_type_defs_.empty(); }
106-
10794
/// Write pre-encoded meta string to buffer (avoids re-encoding on each write)
10895
static void write_encoded_meta_string(Buffer &buffer,
10996
const CachedMetaString &encoded) {
@@ -134,9 +121,8 @@ WriteContext::write_enum_typeinfo(const std::type_index &type) {
134121

135122
if (type_id_low == static_cast<uint32_t>(TypeId::NAMED_ENUM)) {
136123
if (config_->compatible) {
137-
// Write meta_index
138-
FORY_TRY(meta_index, push_meta(type));
139-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
124+
// Write type meta inline using streaming protocol
125+
FORY_RETURN_NOT_OK(write_type_meta(type));
140126
} else {
141127
// Write pre-encoded namespace and type_name
142128
if (type_info->encoded_namespace && type_info->encoded_type_name) {
@@ -166,9 +152,8 @@ WriteContext::write_enum_typeinfo(const TypeInfo *type_info) {
166152

167153
if (type_id_low == static_cast<uint32_t>(TypeId::NAMED_ENUM)) {
168154
if (config_->compatible) {
169-
// Write meta_index using TypeInfo pointer (fast path)
170-
size_t meta_index = push_meta(type_info);
171-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
155+
// Write type meta inline using streaming protocol
156+
write_type_meta(type_info);
172157
} else {
173158
// Write pre-encoded namespace and type_name
174159
if (type_info->encoded_namespace && type_info->encoded_type_name) {
@@ -207,18 +192,16 @@ WriteContext::write_any_typeinfo(uint32_t fory_type_id,
207192
switch (type_id_low) {
208193
case static_cast<uint32_t>(TypeId::NAMED_COMPATIBLE_STRUCT):
209194
case static_cast<uint32_t>(TypeId::COMPATIBLE_STRUCT): {
210-
// Write meta_index
211-
FORY_TRY(meta_index, push_meta(concrete_type_id));
212-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
195+
// Write type meta inline using streaming protocol
196+
FORY_RETURN_NOT_OK(write_type_meta(concrete_type_id));
213197
break;
214198
}
215199
case static_cast<uint32_t>(TypeId::NAMED_ENUM):
216200
case static_cast<uint32_t>(TypeId::NAMED_EXT):
217201
case static_cast<uint32_t>(TypeId::NAMED_STRUCT): {
218202
if (config_->compatible) {
219-
// Write meta_index (share_meta is effectively compatible in C++)
220-
FORY_TRY(meta_index, push_meta(concrete_type_id));
221-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
203+
// Write type meta inline using streaming protocol
204+
FORY_RETURN_NOT_OK(write_type_meta(concrete_type_id));
222205
} else {
223206
// Write pre-encoded namespace and type_name
224207
if (type_info->encoded_namespace && type_info->encoded_type_name) {
@@ -253,16 +236,14 @@ WriteContext::write_struct_type_info(const std::type_index &type_id) {
253236
switch (type_id_low) {
254237
case static_cast<uint32_t>(TypeId::NAMED_COMPATIBLE_STRUCT):
255238
case static_cast<uint32_t>(TypeId::COMPATIBLE_STRUCT): {
256-
// Write meta_index
257-
FORY_TRY(meta_index, push_meta(type_id));
258-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
239+
// Write type meta inline using streaming protocol
240+
FORY_RETURN_NOT_OK(write_type_meta(type_id));
259241
break;
260242
}
261243
case static_cast<uint32_t>(TypeId::NAMED_STRUCT): {
262244
if (config_->compatible) {
263-
// Write meta_index
264-
FORY_TRY(meta_index, push_meta(type_id));
265-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
245+
// Write type meta inline using streaming protocol
246+
FORY_RETURN_NOT_OK(write_type_meta(type_id));
266247
} else {
267248
// Write pre-encoded namespace and type_name
268249
if (type_info->encoded_namespace && type_info->encoded_type_name) {
@@ -295,16 +276,14 @@ WriteContext::write_struct_type_info(const TypeInfo *type_info) {
295276
switch (type_id_low) {
296277
case static_cast<uint32_t>(TypeId::NAMED_COMPATIBLE_STRUCT):
297278
case static_cast<uint32_t>(TypeId::COMPATIBLE_STRUCT): {
298-
// Write meta_index using TypeInfo pointer (fast path)
299-
size_t meta_index = push_meta(type_info);
300-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
279+
// Write type meta inline using streaming protocol
280+
write_type_meta(type_info);
301281
break;
302282
}
303283
case static_cast<uint32_t>(TypeId::NAMED_STRUCT): {
304284
if (config_->compatible) {
305-
// Write meta_index using TypeInfo pointer (fast path)
306-
size_t meta_index = push_meta(type_info);
307-
buffer_.WriteVarUint32(static_cast<uint32_t>(meta_index));
285+
// Write type meta inline using streaming protocol
286+
write_type_meta(type_info);
308287
} else {
309288
// Write pre-encoded namespace and type_name
310289
if (type_info->encoded_namespace && type_info->encoded_type_name) {
@@ -329,10 +308,7 @@ void WriteContext::reset() {
329308
// Clear error state first
330309
error_ = Error();
331310
ref_writer_.reset();
332-
// Clear meta vectors/maps - they're typically small or empty
333-
// in non-compatible mode, so clear() is efficient
334-
write_type_defs_.clear();
335-
write_type_id_index_map_.clear();
311+
// Clear meta map for streaming TypeMeta (size is used as counter)
336312
write_type_info_index_map_.clear();
337313
current_dyn_depth_ = 0;
338314
// Reset buffer indices for reuse - no memory operations needed
@@ -385,100 +361,94 @@ ReadContext::read_enum_type_info(uint32_t base_type_id) {
385361
// Maximum number of parsed type defs to cache (avoid OOM from malicious input)
386362
static constexpr size_t kMaxParsedNumTypeDefs = 8192;
387363

388-
Result<size_t, Error> ReadContext::load_type_meta(int32_t meta_offset) {
389-
size_t current_pos = buffer_->reader_index();
390-
size_t meta_start = current_pos + meta_offset;
391-
buffer_->ReaderIndex(static_cast<uint32_t>(meta_start));
392-
393-
// Load all TypeMetas
364+
Result<const TypeInfo *, Error> ReadContext::read_type_meta() {
394365
Error error;
395-
uint32_t meta_size = buffer_->ReadVarUint32(error);
366+
// Read the index marker
367+
uint32_t index_marker = buffer_->ReadVarUint32(error);
396368
if (FORY_PREDICT_FALSE(!error.ok())) {
397369
return Unexpected(std::move(error));
398370
}
399-
reading_type_infos_.reserve(meta_size);
400371

401-
for (uint32_t i = 0; i < meta_size; i++) {
402-
// Read the 8-byte header first for caching
403-
int64_t meta_header = buffer_->ReadInt64(error);
404-
if (FORY_PREDICT_FALSE(!error.ok())) {
405-
return Unexpected(std::move(error));
406-
}
372+
bool is_ref = (index_marker & 1) == 1;
373+
size_t index = index_marker >> 1;
407374

408-
// Check if we already parsed this type meta (cache lookup by header)
409-
auto cache_it = parsed_type_infos_.find(meta_header);
410-
if (cache_it != parsed_type_infos_.end()) {
411-
// Found in cache - reuse and skip the bytes
412-
reading_type_infos_.push_back(cache_it->second);
413-
FORY_RETURN_NOT_OK(TypeMeta::skip_bytes(*buffer_, meta_header));
414-
continue;
415-
}
375+
if (is_ref) {
376+
// Reference to previously read type
377+
return get_type_info_by_index(index);
378+
}
416379

417-
// Not in cache - parse the TypeMeta
418-
FORY_TRY(parsed_meta,
419-
TypeMeta::from_bytes_with_header(*buffer_, meta_header));
420-
421-
// Find local TypeInfo to get field_id mapping (optional for schema
422-
// evolution)
423-
const TypeInfo *local_type_info = nullptr;
424-
if (parsed_meta->register_by_name) {
425-
auto result = type_resolver_->get_type_info_by_name(
426-
parsed_meta->namespace_str, parsed_meta->type_name);
427-
if (result.ok()) {
428-
local_type_info = result.value();
429-
}
430-
} else {
431-
auto result = type_resolver_->get_type_info_by_id(parsed_meta->type_id);
432-
if (result.ok()) {
433-
local_type_info = result.value();
434-
}
435-
}
380+
// New type - read TypeMeta inline
381+
// Read the 8-byte header first for caching
382+
int64_t meta_header = buffer_->ReadInt64(error);
383+
if (FORY_PREDICT_FALSE(!error.ok())) {
384+
return Unexpected(std::move(error));
385+
}
436386

437-
// Create TypeInfo with field_ids assigned
438-
auto type_info = std::make_unique<TypeInfo>();
439-
if (local_type_info) {
440-
// Have local type - assign field_ids by comparing schemas
441-
// Note: Extension types don't have type_meta (only structs do)
442-
if (local_type_info->type_meta) {
443-
TypeMeta::assign_field_ids(local_type_info->type_meta.get(),
444-
parsed_meta->field_infos);
445-
}
446-
type_info->type_id = local_type_info->type_id;
447-
type_info->type_meta = std::move(parsed_meta);
448-
type_info->type_def = local_type_info->type_def;
449-
// CRITICAL: Copy the harness from the registered type_info
450-
type_info->harness = local_type_info->harness;
451-
type_info->name_to_index = local_type_info->name_to_index;
452-
type_info->namespace_name = local_type_info->namespace_name;
453-
type_info->type_name = local_type_info->type_name;
454-
type_info->register_by_name = local_type_info->register_by_name;
455-
} else {
456-
// No local type - create stub TypeInfo with parsed meta
457-
type_info->type_id = parsed_meta->type_id;
458-
type_info->type_meta = std::move(parsed_meta);
387+
// Check if we already parsed this type meta (cache lookup by header)
388+
auto cache_it = parsed_type_infos_.find(meta_header);
389+
if (cache_it != parsed_type_infos_.end()) {
390+
// Found in cache - reuse and skip the bytes
391+
reading_type_infos_.push_back(cache_it->second);
392+
FORY_RETURN_NOT_OK(TypeMeta::skip_bytes(*buffer_, meta_header));
393+
return cache_it->second;
394+
}
395+
396+
// Not in cache - parse the TypeMeta
397+
FORY_TRY(parsed_meta,
398+
TypeMeta::from_bytes_with_header(*buffer_, meta_header));
399+
400+
// Find local TypeInfo to get field_id mapping (optional for schema evolution)
401+
const TypeInfo *local_type_info = nullptr;
402+
if (parsed_meta->register_by_name) {
403+
auto result = type_resolver_->get_type_info_by_name(
404+
parsed_meta->namespace_str, parsed_meta->type_name);
405+
if (result.ok()) {
406+
local_type_info = result.value();
407+
}
408+
} else {
409+
auto result = type_resolver_->get_type_info_by_id(parsed_meta->type_id);
410+
if (result.ok()) {
411+
local_type_info = result.value();
459412
}
413+
}
460414

461-
// Get raw pointer before moving into storage
462-
const TypeInfo *raw_ptr = type_info.get();
415+
// Create TypeInfo with field_ids assigned
416+
auto type_info = std::make_unique<TypeInfo>();
417+
if (local_type_info) {
418+
// Have local type - assign field_ids by comparing schemas
419+
// Note: Extension types don't have type_meta (only structs do)
420+
if (local_type_info->type_meta) {
421+
TypeMeta::assign_field_ids(local_type_info->type_meta.get(),
422+
parsed_meta->field_infos);
423+
}
424+
type_info->type_id = local_type_info->type_id;
425+
type_info->type_meta = std::move(parsed_meta);
426+
type_info->type_def = local_type_info->type_def;
427+
// CRITICAL: Copy the harness from the registered type_info
428+
type_info->harness = local_type_info->harness;
429+
type_info->name_to_index = local_type_info->name_to_index;
430+
type_info->namespace_name = local_type_info->namespace_name;
431+
type_info->type_name = local_type_info->type_name;
432+
type_info->register_by_name = local_type_info->register_by_name;
433+
} else {
434+
// No local type - create stub TypeInfo with parsed meta
435+
type_info->type_id = parsed_meta->type_id;
436+
type_info->type_meta = std::move(parsed_meta);
437+
}
463438

464-
// Store in primary storage
465-
owned_reading_type_infos_.push_back(std::move(type_info));
439+
// Get raw pointer before moving into storage
440+
const TypeInfo *raw_ptr = type_info.get();
466441

467-
// Cache the parsed TypeInfo (with size limit to prevent OOM)
468-
if (parsed_type_infos_.size() < kMaxParsedNumTypeDefs) {
469-
parsed_type_infos_[meta_header] = raw_ptr;
470-
}
442+
// Store in primary storage
443+
owned_reading_type_infos_.push_back(std::move(type_info));
471444

472-
reading_type_infos_.push_back(raw_ptr);
445+
// Cache the parsed TypeInfo (with size limit to prevent OOM)
446+
if (parsed_type_infos_.size() < kMaxParsedNumTypeDefs) {
447+
parsed_type_infos_[meta_header] = raw_ptr;
473448
}
474449

475-
// Calculate size of meta section
476-
size_t meta_end = buffer_->reader_index();
477-
size_t meta_section_size = meta_end - meta_start;
478-
479-
// Restore buffer position
480-
buffer_->ReaderIndex(static_cast<uint32_t>(current_pos));
481-
return meta_section_size;
450+
reading_type_infos_.push_back(raw_ptr);
451+
return raw_ptr;
482452
}
483453

484454
Result<const TypeInfo *, Error>
@@ -499,25 +469,19 @@ Result<const TypeInfo *, Error> ReadContext::read_any_typeinfo() {
499469
}
500470
uint32_t type_id_low = type_id & 0xff;
501471

502-
// Mirror Rust's read_any_typeinfo using switch for jump table generation
472+
// Use streaming protocol for type meta
503473
switch (type_id_low) {
504474
case static_cast<uint32_t>(TypeId::NAMED_COMPATIBLE_STRUCT):
505475
case static_cast<uint32_t>(TypeId::COMPATIBLE_STRUCT): {
506-
uint32_t meta_index = buffer_->ReadVarUint32(error);
507-
if (FORY_PREDICT_FALSE(!error.ok())) {
508-
return Unexpected(std::move(error));
509-
}
510-
return get_type_info_by_index(meta_index);
476+
// Read type meta inline using streaming protocol
477+
return read_type_meta();
511478
}
512479
case static_cast<uint32_t>(TypeId::NAMED_ENUM):
513480
case static_cast<uint32_t>(TypeId::NAMED_EXT):
514481
case static_cast<uint32_t>(TypeId::NAMED_STRUCT): {
515482
if (config_->compatible) {
516-
uint32_t meta_index = buffer_->ReadVarUint32(error);
517-
if (FORY_PREDICT_FALSE(!error.ok())) {
518-
return Unexpected(std::move(error));
519-
}
520-
return get_type_info_by_index(meta_index);
483+
// Read type meta inline using streaming protocol
484+
return read_type_meta();
521485
}
522486
FORY_TRY(namespace_str,
523487
meta_string_table_.read_string(*buffer_, kNamespaceDecoder));

0 commit comments

Comments
 (0)