Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions olp-cpp-sdk-dataservice-read/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,24 @@ set(DESCRIPTION "C++ API library for reading OLP data")
file(GLOB_RECURSE INC "include/*.h*")
file(GLOB_RECURSE SRC "src/*.*")


find_package(Boost REQUIRED)

add_library(${PROJECT_NAME}
${SRC}
${INC})

target_compile_definitions(${PROJECT_NAME}
PRIVATE
BOOST_ALL_NO_LIB
BOOST_JSON_NO_LIB)

target_include_directories(${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
PRIVATE $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src>
$<BUILD_INTERFACE:${Boost_INCLUDE_DIR}>
PRIVATE ${olp-cpp-sdk-core_INCLUDE_DIRS})

target_link_libraries(${PROJECT_NAME}
Expand Down
44 changes: 24 additions & 20 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2025 HERE Europe B.V.
* Copyright (C) 2023-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,38 +24,43 @@ namespace dataservice {
namespace read {
namespace repository {

RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
JsonByteStream::Ch JsonByteStream::Peek() {
if (ReadEmpty()) {
SwapBuffers();
}
return read_buffer_[count_];
}

RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
boost::json::string_view JsonByteStream::ReadView() {
if (ReadEmpty()) {
SwapBuffers();
}
auto begin = read_buffer_.begin() + count_;
auto terminator_it = std::find(begin, read_buffer_.end(), '\0');
boost::json::string_view::size_type size =
std::distance(begin, terminator_it);
count_ += size;
full_count_ += size;
return {&*begin, size};
}

JsonByteStream::Ch JsonByteStream::Take() {
if (ReadEmpty()) {
SwapBuffers();
}
full_count_++;
return read_buffer_[count_++];
}

size_t RapidJsonByteStream::Tell() const { return full_count_; }

// Not implemented
char* RapidJsonByteStream::PutBegin() { return 0; }
void RapidJsonByteStream::Put(char) {}
void RapidJsonByteStream::Flush() {}
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
size_t JsonByteStream::Tell() const { return full_count_; }

bool RapidJsonByteStream::ReadEmpty() const {
return count_ == read_buffer_.size();
}
bool RapidJsonByteStream::WriteEmpty() const {
bool JsonByteStream::ReadEmpty() const { return count_ == read_buffer_.size(); }
bool JsonByteStream::WriteEmpty() const {
std::unique_lock<std::mutex> lock(mutex_);
return write_buffer_.empty();
}

void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
void JsonByteStream::AppendContent(const char* content, size_t length) {
std::unique_lock<std::mutex> lock(mutex_);

const auto buffer_size = write_buffer_.size();
Expand All @@ -65,7 +70,7 @@ void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
cv_.notify_one();
}

void RapidJsonByteStream::SwapBuffers() {
void JsonByteStream::SwapBuffers() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&]() { return !write_buffer_.empty(); });
std::swap(read_buffer_, write_buffer_);
Expand All @@ -74,10 +79,9 @@ void RapidJsonByteStream::SwapBuffers() {
}

AsyncJsonStream::AsyncJsonStream()
: current_stream_(std::make_shared<RapidJsonByteStream>()),
closed_{false} {}
: current_stream_(std::make_shared<JsonByteStream>()), closed_{false} {}

std::shared_ptr<RapidJsonByteStream> AsyncJsonStream::GetCurrentStream() const {
std::shared_ptr<JsonByteStream> AsyncJsonStream::GetCurrentStream() const {
std::unique_lock<std::mutex> lock(mutex_);
return current_stream_;
}
Expand All @@ -96,7 +100,7 @@ void AsyncJsonStream::ResetStream(const char* content, size_t length) {
return;
}
current_stream_->AppendContent("\0", 1);
current_stream_ = std::make_shared<RapidJsonByteStream>();
current_stream_ = std::make_shared<JsonByteStream>();
current_stream_->AppendContent(content, length);
}

Expand Down
18 changes: 8 additions & 10 deletions olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2023-2025 HERE Europe B.V.
* Copyright (C) 2023-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,14 +25,15 @@

#include <olp/core/client/ApiError.h>
#include <olp/core/porting/optional.h>
#include <boost/json/string_view.hpp>

namespace olp {
namespace dataservice {
namespace read {
namespace repository {

/// Json byte stream class. Implements rapidjson input stream concept.
class RapidJsonByteStream {
class JsonByteStream {
public:
typedef char Ch;

Expand All @@ -43,15 +44,12 @@ class RapidJsonByteStream {
/// character.
Ch Take();

/// Return the view of current read buffer until the end of first \0 character
boost::json::string_view ReadView();

/// Get the current read cursor.
size_t Tell() const;

/// Not needed for reading.
char* PutBegin();
void Put(char);
void Flush();
size_t PutEnd(char*);

bool ReadEmpty() const;
bool WriteEmpty() const;

Expand All @@ -72,7 +70,7 @@ class AsyncJsonStream {
public:
AsyncJsonStream();

std::shared_ptr<RapidJsonByteStream> GetCurrentStream() const;
std::shared_ptr<JsonByteStream> GetCurrentStream() const;

void AppendContent(const char* content, size_t length);

Expand All @@ -86,7 +84,7 @@ class AsyncJsonStream {

private:
mutable std::mutex mutex_;
std::shared_ptr<RapidJsonByteStream> current_stream_;
std::shared_ptr<JsonByteStream> current_stream_;
porting::optional<client::ApiError> error_;
bool closed_;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2024 HERE Europe B.V.
* Copyright (C) 2019-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
#include <olp/core/client/Condition.h>
#include <olp/core/logging/Log.h>
#include <boost/functional/hash.hpp>
#include <boost/json/basic_parser_impl.hpp>
#include "CatalogRepository.h"
#include "generated/api/MetadataApi.h"
#include "generated/api/QueryApi.h"
Expand Down Expand Up @@ -648,37 +649,44 @@ client::ApiNoResponse PartitionsRepository::ParsePartitionsStream(
const std::shared_ptr<AsyncJsonStream>& async_stream,
const PartitionsStreamCallback& partition_callback,
client::CancellationContext context) {
rapidjson::ParseResult parse_result;
auto parse_result =
boost::json::make_error_code(boost::json::error::incomplete);

// We must perform at least one attempt to parse.
do {
rapidjson::Reader reader;
auto partitions_handler =
std::make_shared<repository::PartitionsSaxHandler>(partition_callback);

auto reader_cancellation_token = client::CancellationToken([=]() {
partitions_handler->Abort();
async_stream->CloseStream(client::ApiError::Cancelled());
});

if (!context.ExecuteOrCancelled(
[=]() { return reader_cancellation_token; })) {
auto parser =
std::make_shared<boost::json::basic_parser<PartitionsSaxHandler>>(
boost::json::parse_options{}, partition_callback);

auto reader_cancellation_token =
client::CancellationToken([parser, &async_stream]() {
parser->handler().Abort();
async_stream->CloseStream(client::ApiError::Cancelled());
});

if (!context.ExecuteOrCancelled([reader_cancellation_token]() {
return reader_cancellation_token;
})) {
return client::ApiError::Cancelled();
}

auto json_stream = async_stream->GetCurrentStream();

parse_result = reader.Parse<rapidjson::kParseIterativeFlag>(
*json_stream, *partitions_handler);
while (json_stream->Peek() != '\0') {
auto view = json_stream->ReadView();
if (parser->write_some(true, view.data(), view.size(), parse_result)) {
parse_result = {};
}
}
// Retry to parse the stream until it's closed.
} while (!async_stream->IsClosed());

auto error = async_stream->GetError();

if (error) {
return {*error};
} else if (!parse_result) {
return client::ApiError(parse_result.Code(), "Parsing error");
} else if (parse_result.failed()) {
return client::ApiError(parse_result.value(), "Parsing error");
} else {
return client::ApiNoResult{};
}
Expand Down
Loading
Loading