Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.5.9"
version = "7.5.10"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 9 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ table Consensus {
// Log frequency for gc_repl_reqs messages (log every N times)
// 60 * repl_dev_cleanup_interval_sec (60s) means every 1 hour we will log the gc repl reqs info.
gc_repl_reqs_log_frequency: uint32 = 60 (hotswap);

// Controls whether senders compute and attach a CRC32 checksum to pushdata/fetchdata payloads.
// The receiver always skips CRC verification when the checksum field is zero, so this flag only
// affects the send side. Safe to enable via hotswap once all nodes in the cluster are running a
// version that emits the FetchDataResponse FlatBuffer header; old nodes that still send raw block
// data will simply have checksum=0 and receivers fall back gracefully.
// Not enabled by default: TCP already detects partial-transfer corruption; CRC is opt-in for
// environments requiring additional durability guarantees.
data_checksum_enabled: bool = false (hotswap);
}

table HomeStoreSettings {
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/fetch_data_rpc.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ table ResponseEntry {
dsn : uint64; // Data Sequence number
raft_term : uint64; // Raft term number
data_size : uint32; // Size of the data which is sent as separate non flatbuffer
checksum: uint32; // CRC32 over the data for this entry; 0 when checksum is disabled
}

table FetchDataResponse {
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/push_data_rpc.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ table PushDataRequest {
user_key : [ubyte]; // User key data
data_size : uint32; // Data size, actual data is sent as separate blob not by flatbuffer
time_ms: uint64; // time point when originator pushed this request;
checksum: uint32; // CRC32 over the data payload; 0 when checksum is disabled
}

root_type PushDataRequest;
166 changes: 154 additions & 12 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1097,11 +1097,24 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const&
void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list const& data) {
auto& builder = rreq->create_fb_builder();

// Compute CRC32 over the data payload before serialising so the follower can detect corruption in transit.
// checksum=0 is the sentinel for "not computed"; the receiver skips verification when it sees 0.
// A legitimately computed CRC of 0 (~1 in 4B) is indistinguishable from the sentinel and will
// also skip verification for that packet — an accepted limitation of this design.
uint32_t checksum = 0;
if (HS_DYNAMIC_CONFIG(consensus.data_checksum_enabled)) {
checksum = init_crc32;
for (auto const& iov : data.iovs) {
checksum = crc32_ieee(checksum, r_cast< const unsigned char* >(iov.iov_base), iov.iov_len);
}
}

// Prepare the rpc request packet with all repl_reqs details
builder.FinishSizePrefixed(CreatePushDataRequest(
builder, rreq->traceID(), server_id(), rreq->term(), rreq->dsn(),
builder.CreateVector(rreq->header().cbytes(), rreq->header().size()),
builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms()));
builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms(),
checksum));

rreq->m_pkts = sisl::io_blob::sg_list_to_ioblob_list(data);
rreq->m_pkts.insert(rreq->m_pkts.begin(), sisl::io_blob{builder.GetBufferPointer(), builder.GetSize(), false});
Expand Down Expand Up @@ -1145,8 +1158,15 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d
return;
}

auto const fb_size =
flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t);
auto const fb_size = static_cast< uint64_t >(
flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes())) +
sizeof(flatbuffers::uoffset_t);
flatbuffers::Verifier push_verifier{incoming_buf.cbytes(), static_cast< size_t >(fb_size)};
if (!push_verifier.VerifySizePrefixedBuffer< PushDataRequest >(nullptr)) {
RD_LOGW(NO_TRACE_ID, "Data Channel: PushData FlatBuffer verification failed, ignoring");
rpc_data->send_response();
return;
}
auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes());
if (fb_size + push_req->data_size() != incoming_buf.size()) {
RD_LOGW(NO_TRACE_ID,
Expand All @@ -1165,6 +1185,20 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d

RD_LOGD(rkey.traceID, "Data Channel: PushData received: time diff={} ms.", get_elapsed_time_ms(req_orig_time_ms));

if (push_req->checksum() != 0) {
auto const data_ptr = r_cast< const unsigned char* >(incoming_buf.cbytes() + fb_size);
auto const computed = crc32_ieee(init_crc32, data_ptr, push_req->data_size());
if (computed != push_req->checksum()) {
COUNTER_INCREMENT(m_metrics, data_checksum_mismatch_cnt, 1);
RD_LOGE(rkey.traceID,
"Data Channel: PushData checksum mismatch dsn={}, expected={:#010x}, computed={:#010x}, dropping "
"(follower will fetch from remote on next Raft retry)",
push_req->dsn(), push_req->checksum(), computed);
rpc_data->send_response();
return;
}
}

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("drop_push_data_request")) {
RD_LOGI(rkey.traceID,
Expand Down Expand Up @@ -1525,9 +1559,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
RD_LOGT(NO_TRACE_ID, "Data Channel: FetchData received: fetch_req.size={}",
fetch_req->request()->entries()->size());

struct FetchEntryMeta {
int64_t lsn;
uint64_t dsn;
uint64_t raft_term;
};
std::vector< sisl::sg_list > sgs_vec;
std::vector< FetchEntryMeta > entry_metas;
std::vector< folly::Future< std::error_code > > futs;
sgs_vec.reserve(fetch_req->request()->entries()->size());
entry_metas.reserve(fetch_req->request()->entries()->size());
futs.reserve(fetch_req->request()->entries()->size());

for (auto const& req : *(fetch_req->request()->entries())) {
Expand All @@ -1543,8 +1584,9 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
sgs.iovs.emplace_back(
iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size});

// accumulate the sgs for later use (send back to the requester));
// accumulate the sgs and per-entry metadata for later use (send back to the requester);
sgs_vec.push_back(sgs);
entry_metas.push_back({req->lsn(), req->dsn(), req->raft_term()});

if (originator != server_id()) {
RD_LOGD(NO_TRACE_ID, "non-originator FetchData received: dsn={} lsn={} originator={}, my_server_id={}",
Expand All @@ -1560,7 +1602,8 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_
}

folly::collectAllUnsafe(futs).thenValue(
[this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec)](auto&& vf) {
[this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec),
entry_metas = std::move(entry_metas)](auto&& vf) {
for (auto const& err_c : vf) {
const auto& err = err_c.value();
if (err) {
Expand All @@ -1582,20 +1625,55 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_

RD_LOGT(NO_TRACE_ID, "Data Channel: FetchData data read completed for {} buffers", sgs_vec.size());

// Emit a size-prefixed FetchDataResponse FlatBuffer before the block data only when
// data_checksum_enabled is true. When disabled, send raw block data (pre-checksum wire
// format) so old receivers are never surprised by an unexpected header during rolling
// upgrades. Receivers use try-and-fallback to detect the FlatBuffer transparently.
uint8_t* hdr_buf = nullptr;
uint32_t hdr_size = 0;

if (HS_DYNAMIC_CONFIG(consensus.data_checksum_enabled)) {
flatbuffers::FlatBufferBuilder resp_builder;
std::vector< flatbuffers::Offset< ResponseEntry > > resp_entries;
for (size_t i = 0; i < sgs_vec.size(); ++i) {
auto const& sgs = sgs_vec[i];
uint32_t checksum = init_crc32;
for (auto const& iov : sgs.iovs) {
checksum = crc32_ieee(checksum, r_cast< const unsigned char* >(iov.iov_base), iov.iov_len);
}
int64_t const lsn = (i < entry_metas.size()) ? entry_metas[i].lsn : 0;
uint64_t const dsn = (i < entry_metas.size()) ? entry_metas[i].dsn : 0;
uint64_t const raft_term = (i < entry_metas.size()) ? entry_metas[i].raft_term : 0;
resp_entries.push_back(
CreateResponseEntry(resp_builder, lsn, dsn, raft_term,
static_cast< uint32_t >(sgs.size), checksum));
}
resp_builder.FinishSizePrefixed(
CreateFetchDataResponse(resp_builder, server_id(), resp_builder.CreateVector(resp_entries)));

// Heap-copy the FlatBuffer so it outlives resp_builder until the send completion callback.
hdr_size = resp_builder.GetSize();
hdr_buf = new uint8_t[hdr_size];
std::memcpy(hdr_buf, resp_builder.GetBufferPointer(), hdr_size);
}

// now prepare the io_blob_list to response back to requester;
nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{};
if (hdr_buf) { pkts.emplace_back(sisl::io_blob{hdr_buf, hdr_size, false}); }
for (auto const& sgs : sgs_vec) {
auto const ret = sisl::io_blob::sg_list_to_ioblob_list(sgs);
pkts.insert(pkts.end(), ret.begin(), ret.end());
}

rpc_data->set_comp_cb([sgs_vec = std::move(sgs_vec)](boost::intrusive_ptr< sisl::GenericRpcData >&) {
for (auto const& sgs : sgs_vec) {
for (auto const& iov : sgs.iovs) {
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base));
rpc_data->set_comp_cb(
[sgs_vec = std::move(sgs_vec), hdr_buf](boost::intrusive_ptr< sisl::GenericRpcData >&) {
delete[] hdr_buf; // delete[] nullptr is a no-op when checksums are disabled
for (auto const& sgs : sgs_vec) {
for (auto const& iov : sgs.iovs) {
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base));
}
}
}
});
});

rpc_data->send_response(pkts);
});
Expand All @@ -1612,13 +1690,71 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
return;
}

// Try-and-fallback: attempt to parse a size-prefixed FetchDataResponse FlatBuffer at the start
// of the blob. New senders (data_checksum_enabled=true) prepend this header; old senders emit
// raw block data with no prefix. In HomeObject (the layer above HomeStore), raw block data
// begins with DataHeader magic (0x21fdffdba8d68fc6), whose first 4 bytes as a little-endian
// uoffset_t read as ~2.8 GB — far larger than any real response — reliably failing the
// fb_hdr_size <= total_size check below. flatbuffers::Verifier provides a further structural
// guard for any other raw data whose size prefix happens to look plausible.
const flatbuffers::Vector< flatbuffers::Offset< ResponseEntry > >* resp_entries = nullptr;

if (total_size >= sizeof(flatbuffers::uoffset_t)) {
auto const fb_hdr_size = static_cast< uint64_t >(
flatbuffers::ReadScalar< flatbuffers::uoffset_t >(raw_data)) +
sizeof(flatbuffers::uoffset_t);
if (fb_hdr_size <= static_cast< uint64_t >(total_size)) {
flatbuffers::Verifier verifier{raw_data, fb_hdr_size};
if (verifier.VerifySizePrefixedBuffer< FetchDataResponse >(nullptr)) {
auto const fetch_resp = flatbuffers::GetSizePrefixedRoot< FetchDataResponse >(raw_data);
raw_data += fb_hdr_size;
total_size -= fb_hdr_size;
if (fetch_resp->entries()) {
resp_entries = fetch_resp->entries();
if (resp_entries->size() != rreqs.size()) {
RD_LOGW(NO_TRACE_ID,
"Data Channel: FetchData response entry count {} != request count {}, "
"some entries will not be checksum-verified",
resp_entries->size(), rreqs.size());
}
}
}
}
}

// Count only actual block data bytes (framing header excluded).
COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size);

RD_LOGD(NO_TRACE_ID, "Data Channel: FetchData completed for {} requests", rreqs.size());

for (auto const& rreq : rreqs) {
std::vector< repl_req_ptr_t > checksum_mismatch_rreqs;
for (size_t i = 0; i < rreqs.size(); ++i) {
auto const& rreq = rreqs[i];
auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size();

if (data_size > total_size) {
RD_LOGE(NO_TRACE_ID,
"Data Channel: FetchData response truncated: need {} bytes for dsn={} but only {} bytes remain, "
"aborting response processing",
data_size, rreq->dsn(), total_size);
return;
}

if (resp_entries && i < resp_entries->size() && (*resp_entries)[i]->checksum() != 0) {
auto const computed = crc32_ieee(init_crc32, r_cast< const unsigned char* >(raw_data), data_size);
if (computed != (*resp_entries)[i]->checksum()) {
COUNTER_INCREMENT(m_metrics, data_checksum_mismatch_cnt, 1);
RD_LOGE(rreq->traceID(),
"Data Channel: FetchData checksum mismatch dsn={}, expected={:#010x}, computed={:#010x}; "
"re-fetching immediately.",
rreq->dsn(), (*resp_entries)[i]->checksum(), computed);
raw_data += data_size;
total_size -= data_size;
checksum_mismatch_rreqs.emplace_back(rreq);
continue;
}
}

if (!rreq->save_fetched_data(response, raw_data, data_size)) {
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string());
auto const local_size = rreq->local_blkid().blk_count() * get_blk_size();
Expand Down Expand Up @@ -1666,6 +1802,12 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
total_size -= data_size;
}

if (!checksum_mismatch_rreqs.empty()) {
RD_LOGD(NO_TRACE_ID, "Data Channel: Re-fetching {} rreqs that had checksum mismatches",
checksum_mismatch_rreqs.size());
check_and_fetch_remote_data(std::move(checksum_mismatch_rreqs));
}

RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
}

Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class RaftReplDevMetrics : public sisl::MetricsGroup {
REGISTER_COUNTER(read_err_cnt, "total read error count", "read_err_cnt", {"op", "read"});
REGISTER_COUNTER(write_err_cnt, "total write error count", "write_err_cnt", {"op", "write"});
REGISTER_COUNTER(fetch_err_cnt, "total fetch data error count", "fetch_err_cnt", {"op", "fetch"});
REGISTER_COUNTER(data_checksum_mismatch_cnt, "CRC32 mismatches on push/fetch data channels",
"data_checksum_mismatch_cnt", {"op", "checksum"});

REGISTER_COUNTER(fetch_rreq_cnt, "total fetch data count", "fetch_data_req_cnt", {"op", "fetch"});
REGISTER_COUNTER(fetch_total_blk_size, "total fetch data blocks size", "fetch_total_blk_size", {"op", "fetch"});
Expand Down
51 changes: 51 additions & 0 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,57 @@ TEST_F(RaftReplDevTest, Follower_Fetch_OnActive_ReplicaGroup) {
if (g_helper->replica_num() != 0) { g_helper->remove_flip("drop_push_data_request"); }
}

// Verifies the happy path with checksums explicitly enabled: writes should commit correctly
// and all replicas should hold the same data.
TEST_F(RaftReplDevTest, Checksum_Enabled_PushData_Path) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

LOGINFO("Enabling data_checksum_enabled");
HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; });
HS_SETTINGS_FACTORY().save();

this->write_on_leader(20, true /* wait_for_commit */);

g_helper->sync_for_verify_start();
LOGINFO("Validate all data written so far by reading them");
this->validate_data();

HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; });
HS_SETTINGS_FACTORY().save();
g_helper->sync_for_cleanup_start();
}

#ifdef _PRERELEASE
// Verifies that the fetch path works correctly with checksums enabled.
// Drops all push-data on non-leader replicas so they are forced to fetch, then checks that
// the framing header is correctly parsed and data arrives intact.
TEST_F(RaftReplDevTest, Checksum_Enabled_FetchData_Path) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();

LOGINFO("Enabling data_checksum_enabled");
HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; });
HS_SETTINGS_FACTORY().save();

if (g_helper->replica_num() != 0) {
LOGINFO("Drop all push-data so follower {} must fetch with checksum header", g_helper->replica_num());
g_helper->set_basic_flip("drop_push_data_request");
}

this->write_on_leader(20, true /* wait_for_commit */);

g_helper->sync_for_verify_start();
LOGINFO("Validate all data written so far by reading them");
this->validate_data();

HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; });
HS_SETTINGS_FACTORY().save();
g_helper->sync_for_cleanup_start();
if (g_helper->replica_num() != 0) { g_helper->remove_flip("drop_push_data_request"); }
}
#endif

TEST_F(RaftReplDevTest, Write_With_Diabled_Leader_Push_Data) {
g_helper->set_basic_flip("disable_leader_push_data", std::numeric_limits< int >::max(), 100);
LOGINFO("Homestore replica={} setup completed, all the push_data from leader are disabled",
Expand Down