Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "4.1.15"
version = "4.1.16"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeStore"
Expand Down
9 changes: 4 additions & 5 deletions src/lib/homestore_backend/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ folly::SemiFuture< bool > GCManager::pdev_gc_actor::add_gc_task(uint8_t priority
const auto pg_id = EXvchunk->m_pg_id.value();
m_hs_home_object->gc_manager()->incr_pg_pending_gc_task(pg_id);

if (!m_hs_home_object->can_chunks_in_pg_be_gc(pg_id)) {
if (!m_hs_home_object->is_pg_alive(pg_id)) {
LOGDEBUGMOD(gcmgr, "chunk_id={} belongs to pg {}, which is not eligible for gc at this moment!",
move_from_chunk, pg_id)
m_hs_home_object->gc_manager()->decr_pg_pending_gc_task(pg_id);
Expand Down Expand Up @@ -504,7 +504,7 @@ void GCManager::pdev_gc_actor::handle_recovered_gc_task(
}

// we have no gc_task_guard for recovered gc task, so we need to do this manually to make sure the gc task can be
// marked as completed and the pg can be marked as available for new gc task
// marked as completed
on_gc_task_completed(priority, pg_id, move_from_chunk, move_to_chunk, vchunk_id, true, 0);

GCLOGD(RECOVERD_GC_TASK_ID, pg_id, NO_SHARD_ID,
Expand Down Expand Up @@ -797,9 +797,8 @@ bool GCManager::pdev_gc_actor::copy_valid_data(
move_from_chunk);
}

// check if all the pbas in the valid_blob_indexes are in move_from_chunk, if not, it means the
// shard is being modified during gc, we can not guarantee the data consistency, so we fail this gc
// task and let it be retried later.
// check if all the pbas in the valid_blob_indexes are in move_from_chunk, if not, we cancel this task and retry
// later.
for (const auto& [blob, v] : valid_blob_indexes) {
auto pba = v.pbas();
if (pba.chunk_num() != move_from_chunk) {
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ class HSHomeObject : public HomeObjectImpl {
*/
bool pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine = false);

void destroy_pg_resource(pg_id_t pg_id);

bool pause_pg_state_machine(pg_id_t pg_id);

bool resume_pg_state_machine(pg_id_t pg_id);
Expand Down Expand Up @@ -977,7 +979,7 @@ class HSHomeObject : public HomeObjectImpl {
* @param pg_id The ID of the PG whose shards are to be destroyed.
* @return True if the chunks in the PG can be garbage collected, false otherwise.
*/
bool can_chunks_in_pg_be_gc(pg_id_t pg_id) const;
bool is_pg_alive(pg_id_t pg_id) const;

bool pg_exists(pg_id_t pg_id) const;

Expand Down
28 changes: 15 additions & 13 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,19 @@ std::optional< pg_id_t > HSHomeObject::get_pg_id_with_group_id(group_id_t group_

void HSHomeObject::_destroy_pg(pg_id_t pg_id) { pg_destroy(pg_id); }

void HSHomeObject::destroy_pg_resource(pg_id_t pg_id) {
destroy_shards(pg_id);
destroy_hs_resources(pg_id);
destroy_pg_index_table(pg_id);
destroy_pg_superblk(pg_id);

// return pg chunks to dev heap
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg={} chunks to dev_heap", pg_id);
LOGI("resource of pg={} is destroyed", pg_id);
}

bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine) {
if (need_to_pause_pg_state_machine && !pause_pg_state_machine(pg_id)) {
LOGI("Failed to pause pg state machine, pg_id={}", pg_id);
Expand All @@ -699,18 +712,7 @@ bool HSHomeObject::pg_destroy(pg_id_t pg_id, bool need_to_pause_pg_state_machine
// we have the assumption that after pg is marked as destroyed, it will not be marked as alive again.
// TODO:: if this assumption is broken, we need to handle it.
gc_mgr_->drain_pg_pending_gc_task(pg_id);

destroy_shards(pg_id);
destroy_hs_resources(pg_id);
destroy_pg_index_table(pg_id);
destroy_pg_superblk(pg_id);

// return pg chunks to dev heap
// which must be done after destroying pg super blk to avoid multiple pg use same chunks
bool res = chunk_selector_->return_pg_chunks_to_dev_heap(pg_id);
RELEASE_ASSERT(res, "Failed to return pg={} chunks to dev_heap", pg_id);

LOGI("pg={} is destroyed", pg_id);
destroy_pg_resource(pg_id);
return true;
}

Expand Down Expand Up @@ -800,7 +802,7 @@ void HSHomeObject::mark_pg_destroyed(pg_id_t pg_id) {
LOGD("pg={} is marked as destroyed", pg_id);
}

bool HSHomeObject::can_chunks_in_pg_be_gc(pg_id_t pg_id) const {
bool HSHomeObject::is_pg_alive(pg_id_t pg_id) const {
auto lg = std::scoped_lock(_pg_lock);
auto hs_pg = const_cast< HS_PG* >(_get_hs_pg_unlocked(pg_id));
if (hs_pg == nullptr) {
Expand Down
33 changes: 32 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
RELEASE_ASSERT_EQ(pbas.size(), 1, "Invalid blklist size");

// on_log_replay_done and baseline resync may cleanup resources for a destroyed pg. If this repl group later joins
// raft or commit stale log, skip applying them to avoid touching freed pg resources. all the logs will be truncated
// after BR is completed, so there is no need to worry about the stale commit after BR.
if (msg_header->msg_type != ReplicationMessageType::CREATE_PG_MSG) {
const auto pg_id = msg_header->pg_id;
if (!home_object_->pg_exists(pg_id)) {
LOGW("skip stale commit lsn={} msg_type={} for non-existent pg={}", lsn, msg_header->msg_type, pg_id);
return;
}

const auto hs_pg = home_object_->_get_hs_pg_unlocked(pg_id);
if ((hs_pg != nullptr) && (hs_pg->pg_sb_->state == PGState::DESTROYED)) {
LOGW("skip stale commit lsn={} msg_type={} for destroyed pg={}", lsn, msg_header->msg_type, pg_id);
return;
}
}

LOGT("applying raft log commit with lsn={}, msg type={}", lsn, msg_header->msg_type);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_PG_MSG: {
Expand Down Expand Up @@ -498,6 +515,12 @@ void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::sna
// If PG already exists, clean the stale pg resources. Let's resync on a pristine base
if (home_object_->pg_exists(pg_data->pg_id())) {
LOGI("pg already exists, clean pg resources before snapshot, pg={} {}", pg_data->pg_id(), log_suffix);

// we only reset this if destroying pg happens in BR case. for other cases (on_destroy and _exit_pg),
// since this replica will leave the PG and no later logs will be received, no need to reset this.
reset_no_space_left_error_info();
repl_dev()->reset_latch_lsn();

// Need to pause state machine before destroying the PG, if fail, let raft retry.
if (!home_object_->pg_destroy(pg_data->pg_id(), true /* pause state machine */)) {
LOGE("failed to destroy existing pg, let raft retry, pg={} {}", pg_data->pg_id(), log_suffix);
Expand Down Expand Up @@ -1030,7 +1053,15 @@ void ReplicationStateMachine::on_log_replay_done(const homestore::group_id_t& gr
const auto pg_id = pg_id_opt.value();
RELEASE_ASSERT(home_object_->pg_exists(pg_id), "pg={} should exist, but not! fatal error!", pg_id);

const auto& shards_in_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->_get_hs_pg_unlocked(pg_id)))->shards_;
const auto hs_pg = (const_cast< HSHomeObject::HS_PG* >(home_object_->_get_hs_pg_unlocked(pg_id)));
RELEASE_ASSERT(hs_pg, "Failed to get pg={} when log replay done", pg_id);
if (hs_pg->pg_sb_->state == PGState::DESTROYED) {
// cleaned up the pg resources , which should be cleaned up before restarted but failed.
home_object_->destroy_pg_resource(pg_id);
return;
}

const auto& shards_in_pg = hs_pg->shards_;
auto chunk_selector = home_object_->chunk_selector();

for (const auto& shard_iter : shards_in_pg) {
Expand Down
Loading