diff --git a/conanfile.py b/conanfile.py index 0d20a477d..b5db85ddd 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.5.9" + version = "7.5.10" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 3d3ec3b0f..77a28bffd 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -324,6 +324,15 @@ table Consensus { // Log frequency for gc_repl_reqs messages (log every N times) // 60 * repl_dev_cleanup_interval_sec (60s) means every 1 hour we will log the gc repl reqs info. gc_repl_reqs_log_frequency: uint32 = 60 (hotswap); + + // Controls whether senders compute and attach a CRC32 checksum to pushdata/fetchdata payloads. + // The receiver always skips CRC verification when the checksum field is zero, so this flag only + // affects the send side. Safe to enable via hotswap once all nodes in the cluster are running a + // version that emits the FetchDataResponse FlatBuffer header; old nodes that still send raw block + // data will simply have checksum=0 and receivers fall back gracefully. + // Not enabled by default: TCP already detects partial-transfer corruption; CRC is opt-in for + // environments requiring additional durability guarantees. + data_checksum_enabled: bool = false (hotswap); } table HomeStoreSettings { diff --git a/src/lib/replication/fetch_data_rpc.fbs b/src/lib/replication/fetch_data_rpc.fbs index e809cde42..044e188b1 100644 --- a/src/lib/replication/fetch_data_rpc.fbs +++ b/src/lib/replication/fetch_data_rpc.fbs @@ -19,6 +19,7 @@ table ResponseEntry { dsn : uint64; // Data Sequence number raft_term : uint64; // Raft term number data_size : uint32; // Size of the data which is sent as separate non flatbuffer + checksum: uint32; // CRC32 over the data for this entry; 0 when checksum is disabled } table FetchDataResponse { diff --git a/src/lib/replication/push_data_rpc.fbs b/src/lib/replication/push_data_rpc.fbs index d9a981e7c..5b31d0e1d 100644 --- a/src/lib/replication/push_data_rpc.fbs +++ b/src/lib/replication/push_data_rpc.fbs @@ -10,6 +10,7 @@ table PushDataRequest { user_key : [ubyte]; // User key data data_size : uint32; // Data size, actual data is sent as separate blob not by flatbuffer time_ms: uint64; // time point when originator pushed this request; + checksum: uint32; // CRC32 over the data payload; 0 when checksum is disabled } root_type PushDataRequest; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b83a9993b..6b7fbfe40 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1097,11 +1097,24 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list const& data) { auto& builder = rreq->create_fb_builder(); + // Compute CRC32 over the data payload before serialising so the follower can detect corruption in transit. + // checksum=0 is the sentinel for "not computed"; the receiver skips verification when it sees 0. + // A legitimately computed CRC of 0 (~1 in 4B) is indistinguishable from the sentinel and will + // also skip verification for that packet — an accepted limitation of this design. + uint32_t checksum = 0; + if (HS_DYNAMIC_CONFIG(consensus.data_checksum_enabled)) { + checksum = init_crc32; + for (auto const& iov : data.iovs) { + checksum = crc32_ieee(checksum, r_cast< const unsigned char* >(iov.iov_base), iov.iov_len); + } + } + // Prepare the rpc request packet with all repl_reqs details builder.FinishSizePrefixed(CreatePushDataRequest( builder, rreq->traceID(), server_id(), rreq->term(), rreq->dsn(), builder.CreateVector(rreq->header().cbytes(), rreq->header().size()), - builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms())); + builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms(), + checksum)); rreq->m_pkts = sisl::io_blob::sg_list_to_ioblob_list(data); rreq->m_pkts.insert(rreq->m_pkts.begin(), sisl::io_blob{builder.GetBufferPointer(), builder.GetSize(), false}); @@ -1145,8 +1158,15 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d return; } - auto const fb_size = - flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t); + auto const fb_size = static_cast< uint64_t >( + flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes())) + + sizeof(flatbuffers::uoffset_t); + flatbuffers::Verifier push_verifier{incoming_buf.cbytes(), static_cast< size_t >(fb_size)}; + if (!push_verifier.VerifySizePrefixedBuffer< PushDataRequest >(nullptr)) { + RD_LOGW(NO_TRACE_ID, "Data Channel: PushData FlatBuffer verification failed, ignoring"); + rpc_data->send_response(); + return; + } auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes()); if (fb_size + push_req->data_size() != incoming_buf.size()) { RD_LOGW(NO_TRACE_ID, @@ -1165,6 +1185,20 @@ void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_d RD_LOGD(rkey.traceID, "Data Channel: PushData received: time diff={} ms.", get_elapsed_time_ms(req_orig_time_ms)); + if (push_req->checksum() != 0) { + auto const data_ptr = r_cast< const unsigned char* >(incoming_buf.cbytes() + fb_size); + auto const computed = crc32_ieee(init_crc32, data_ptr, push_req->data_size()); + if (computed != push_req->checksum()) { + COUNTER_INCREMENT(m_metrics, data_checksum_mismatch_cnt, 1); + RD_LOGE(rkey.traceID, + "Data Channel: PushData checksum mismatch dsn={}, expected={:#010x}, computed={:#010x}, dropping " + "(follower will fetch from remote on next Raft retry)", + push_req->dsn(), push_req->checksum(), computed); + rpc_data->send_response(); + return; + } + } + #ifdef _PRERELEASE if (iomgr_flip::instance()->test_flip("drop_push_data_request")) { RD_LOGI(rkey.traceID, @@ -1525,9 +1559,16 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ RD_LOGT(NO_TRACE_ID, "Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size()); + struct FetchEntryMeta { + int64_t lsn; + uint64_t dsn; + uint64_t raft_term; + }; std::vector< sisl::sg_list > sgs_vec; + std::vector< FetchEntryMeta > entry_metas; std::vector< folly::Future< std::error_code > > futs; sgs_vec.reserve(fetch_req->request()->entries()->size()); + entry_metas.reserve(fetch_req->request()->entries()->size()); futs.reserve(fetch_req->request()->entries()->size()); for (auto const& req : *(fetch_req->request()->entries())) { @@ -1543,8 +1584,9 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ sgs.iovs.emplace_back( iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size}); - // accumulate the sgs for later use (send back to the requester)); + // accumulate the sgs and per-entry metadata for later use (send back to the requester); sgs_vec.push_back(sgs); + entry_metas.push_back({req->lsn(), req->dsn(), req->raft_term()}); if (originator != server_id()) { RD_LOGD(NO_TRACE_ID, "non-originator FetchData received: dsn={} lsn={} originator={}, my_server_id={}", @@ -1560,7 +1602,8 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ } folly::collectAllUnsafe(futs).thenValue( - [this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec)](auto&& vf) { + [this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec), + entry_metas = std::move(entry_metas)](auto&& vf) { for (auto const& err_c : vf) { const auto& err = err_c.value(); if (err) { @@ -1582,20 +1625,55 @@ void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_ RD_LOGT(NO_TRACE_ID, "Data Channel: FetchData data read completed for {} buffers", sgs_vec.size()); + // Emit a size-prefixed FetchDataResponse FlatBuffer before the block data only when + // data_checksum_enabled is true. When disabled, send raw block data (pre-checksum wire + // format) so old receivers are never surprised by an unexpected header during rolling + // upgrades. Receivers use try-and-fallback to detect the FlatBuffer transparently. + uint8_t* hdr_buf = nullptr; + uint32_t hdr_size = 0; + + if (HS_DYNAMIC_CONFIG(consensus.data_checksum_enabled)) { + flatbuffers::FlatBufferBuilder resp_builder; + std::vector< flatbuffers::Offset< ResponseEntry > > resp_entries; + for (size_t i = 0; i < sgs_vec.size(); ++i) { + auto const& sgs = sgs_vec[i]; + uint32_t checksum = init_crc32; + for (auto const& iov : sgs.iovs) { + checksum = crc32_ieee(checksum, r_cast< const unsigned char* >(iov.iov_base), iov.iov_len); + } + int64_t const lsn = (i < entry_metas.size()) ? entry_metas[i].lsn : 0; + uint64_t const dsn = (i < entry_metas.size()) ? entry_metas[i].dsn : 0; + uint64_t const raft_term = (i < entry_metas.size()) ? entry_metas[i].raft_term : 0; + resp_entries.push_back( + CreateResponseEntry(resp_builder, lsn, dsn, raft_term, + static_cast< uint32_t >(sgs.size), checksum)); + } + resp_builder.FinishSizePrefixed( + CreateFetchDataResponse(resp_builder, server_id(), resp_builder.CreateVector(resp_entries))); + + // Heap-copy the FlatBuffer so it outlives resp_builder until the send completion callback. + hdr_size = resp_builder.GetSize(); + hdr_buf = new uint8_t[hdr_size]; + std::memcpy(hdr_buf, resp_builder.GetBufferPointer(), hdr_size); + } + // now prepare the io_blob_list to response back to requester; nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{}; + if (hdr_buf) { pkts.emplace_back(sisl::io_blob{hdr_buf, hdr_size, false}); } for (auto const& sgs : sgs_vec) { auto const ret = sisl::io_blob::sg_list_to_ioblob_list(sgs); pkts.insert(pkts.end(), ret.begin(), ret.end()); } - rpc_data->set_comp_cb([sgs_vec = std::move(sgs_vec)](boost::intrusive_ptr< sisl::GenericRpcData >&) { - for (auto const& sgs : sgs_vec) { - for (auto const& iov : sgs.iovs) { - iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + rpc_data->set_comp_cb( + [sgs_vec = std::move(sgs_vec), hdr_buf](boost::intrusive_ptr< sisl::GenericRpcData >&) { + delete[] hdr_buf; // delete[] nullptr is a no-op when checksums are disabled + for (auto const& sgs : sgs_vec) { + for (auto const& iov : sgs.iovs) { + iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base)); + } } - } - }); + }); rpc_data->send_response(pkts); }); @@ -1612,13 +1690,71 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons return; } + // Try-and-fallback: attempt to parse a size-prefixed FetchDataResponse FlatBuffer at the start + // of the blob. New senders (data_checksum_enabled=true) prepend this header; old senders emit + // raw block data with no prefix. In HomeObject (the layer above HomeStore), raw block data + // begins with DataHeader magic (0x21fdffdba8d68fc6), whose first 4 bytes as a little-endian + // uoffset_t read as ~2.8 GB — far larger than any real response — reliably failing the + // fb_hdr_size <= total_size check below. flatbuffers::Verifier provides a further structural + // guard for any other raw data whose size prefix happens to look plausible. + const flatbuffers::Vector< flatbuffers::Offset< ResponseEntry > >* resp_entries = nullptr; + + if (total_size >= sizeof(flatbuffers::uoffset_t)) { + auto const fb_hdr_size = static_cast< uint64_t >( + flatbuffers::ReadScalar< flatbuffers::uoffset_t >(raw_data)) + + sizeof(flatbuffers::uoffset_t); + if (fb_hdr_size <= static_cast< uint64_t >(total_size)) { + flatbuffers::Verifier verifier{raw_data, fb_hdr_size}; + if (verifier.VerifySizePrefixedBuffer< FetchDataResponse >(nullptr)) { + auto const fetch_resp = flatbuffers::GetSizePrefixedRoot< FetchDataResponse >(raw_data); + raw_data += fb_hdr_size; + total_size -= fb_hdr_size; + if (fetch_resp->entries()) { + resp_entries = fetch_resp->entries(); + if (resp_entries->size() != rreqs.size()) { + RD_LOGW(NO_TRACE_ID, + "Data Channel: FetchData response entry count {} != request count {}, " + "some entries will not be checksum-verified", + resp_entries->size(), rreqs.size()); + } + } + } + } + } + + // Count only actual block data bytes (framing header excluded). COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size); RD_LOGD(NO_TRACE_ID, "Data Channel: FetchData completed for {} requests", rreqs.size()); - for (auto const& rreq : rreqs) { + std::vector< repl_req_ptr_t > checksum_mismatch_rreqs; + for (size_t i = 0; i < rreqs.size(); ++i) { + auto const& rreq = rreqs[i]; auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size(); + if (data_size > total_size) { + RD_LOGE(NO_TRACE_ID, + "Data Channel: FetchData response truncated: need {} bytes for dsn={} but only {} bytes remain, " + "aborting response processing", + data_size, rreq->dsn(), total_size); + return; + } + + if (resp_entries && i < resp_entries->size() && (*resp_entries)[i]->checksum() != 0) { + auto const computed = crc32_ieee(init_crc32, r_cast< const unsigned char* >(raw_data), data_size); + if (computed != (*resp_entries)[i]->checksum()) { + COUNTER_INCREMENT(m_metrics, data_checksum_mismatch_cnt, 1); + RD_LOGE(rreq->traceID(), + "Data Channel: FetchData checksum mismatch dsn={}, expected={:#010x}, computed={:#010x}; " + "re-fetching immediately.", + rreq->dsn(), (*resp_entries)[i]->checksum(), computed); + raw_data += data_size; + total_size -= data_size; + checksum_mismatch_rreqs.emplace_back(rreq); + continue; + } + } + if (!rreq->save_fetched_data(response, raw_data, data_size)) { RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string()); auto const local_size = rreq->local_blkid().blk_count() * get_blk_size(); @@ -1666,6 +1802,12 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons total_size -= data_size; } + if (!checksum_mismatch_rreqs.empty()) { + RD_LOGD(NO_TRACE_ID, "Data Channel: Re-fetching {} rreqs that had checksum mismatches", + checksum_mismatch_rreqs.size()); + check_and_fetch_remote_data(std::move(checksum_mismatch_rreqs)); + } + RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed"); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index c27155025..da58334e6 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -71,6 +71,8 @@ class RaftReplDevMetrics : public sisl::MetricsGroup { REGISTER_COUNTER(read_err_cnt, "total read error count", "read_err_cnt", {"op", "read"}); REGISTER_COUNTER(write_err_cnt, "total write error count", "write_err_cnt", {"op", "write"}); REGISTER_COUNTER(fetch_err_cnt, "total fetch data error count", "fetch_err_cnt", {"op", "fetch"}); + REGISTER_COUNTER(data_checksum_mismatch_cnt, "CRC32 mismatches on push/fetch data channels", + "data_checksum_mismatch_cnt", {"op", "checksum"}); REGISTER_COUNTER(fetch_rreq_cnt, "total fetch data count", "fetch_data_req_cnt", {"op", "fetch"}); REGISTER_COUNTER(fetch_total_blk_size, "total fetch data blocks size", "fetch_total_blk_size", {"op", "fetch"}); diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index eafad14a8..c4c497e2c 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -111,6 +111,57 @@ TEST_F(RaftReplDevTest, Follower_Fetch_OnActive_ReplicaGroup) { if (g_helper->replica_num() != 0) { g_helper->remove_flip("drop_push_data_request"); } } +// Verifies the happy path with checksums explicitly enabled: writes should commit correctly +// and all replicas should hold the same data. +TEST_F(RaftReplDevTest, Checksum_Enabled_PushData_Path) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + LOGINFO("Enabling data_checksum_enabled"); + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; }); + HS_SETTINGS_FACTORY().save(); + + this->write_on_leader(20, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); +} + +#ifdef _PRERELEASE +// Verifies that the fetch path works correctly with checksums enabled. +// Drops all push-data on non-leader replicas so they are forced to fetch, then checks that +// the framing header is correctly parsed and data arrives intact. +TEST_F(RaftReplDevTest, Checksum_Enabled_FetchData_Path) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + + LOGINFO("Enabling data_checksum_enabled"); + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = true; }); + HS_SETTINGS_FACTORY().save(); + + if (g_helper->replica_num() != 0) { + LOGINFO("Drop all push-data so follower {} must fetch with checksum header", g_helper->replica_num()); + g_helper->set_basic_flip("drop_push_data_request"); + } + + this->write_on_leader(20, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.data_checksum_enabled = false; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); + if (g_helper->replica_num() != 0) { g_helper->remove_flip("drop_push_data_request"); } +} +#endif + TEST_F(RaftReplDevTest, Write_With_Diabled_Leader_Push_Data) { g_helper->set_basic_flip("disable_leader_push_data", std::numeric_limits< int >::max(), 100); LOGINFO("Homestore replica={} setup completed, all the push_data from leader are disabled",