diff --git a/src/lib/device/journal_vdev.cpp b/src/lib/device/journal_vdev.cpp index 3e4dda2a0..39b6728a7 100644 --- a/src/lib/device/journal_vdev.cpp +++ b/src/lib/device/journal_vdev.cpp @@ -206,6 +206,7 @@ void JournalVirtualDev::destroy(logdev_id_t logdev_id) { } void JournalVirtualDev::Descriptor::append_chunk() { + std::unique_lock lock{m_chunk_mutex}; // Get a new chunk from the pool. auto new_chunk = m_vdev.m_chunk_pool->dequeue(); #if 0 @@ -238,7 +239,7 @@ void JournalVirtualDev::Descriptor::append_chunk() { } m_vdev.update_chunk_private(last_chunk, last_chunk_private); LOGINFOMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->to_string(), last_chunk->chunk_id(), - to_string()); + to_string_nolock()); } else { // If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id. @@ -249,7 +250,7 @@ void JournalVirtualDev::Descriptor::append_chunk() { // Append the new chunk m_journal_chunks.push_back(new_chunk); m_vdev.update_chunk_private(new_chunk, new_chunk_private); - LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string()); + LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string_nolock()); } } @@ -309,12 +310,13 @@ bool JournalVirtualDev::Descriptor::validate_append_size(size_t req_sz) const { } auto JournalVirtualDev::Descriptor::process_pwrite_offset(size_t len, off_t offset) { + std::shared_lock lock{m_chunk_mutex}; // convert logical offset to chunk and its offset auto const chunk_details = offset_to_chunk(offset); auto const [chunk, _, offset_in_chunk] = chunk_details; LOGTRACEMOD(journalvdev, "writing in chunk: {}, offset: 0x{} len: {} offset_in_chunk: 0x{} chunk_sz: {} desc {}", - chunk->chunk_id(), to_hex(offset), len, to_hex(offset_in_chunk), chunk->size(), to_string()); + chunk->chunk_id(), to_hex(offset), len, to_hex(offset_in_chunk), chunk->size(), to_string_nolock()); // this assert only valid for pwrite/pwritev, which calls alloc_next_append_blk to get the offset to do the // write, which guarantees write will with the returned offset will not accross chunk boundary. @@ -393,6 +395,7 @@ std::error_code JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, in /////////////////////////////// Read Section ////////////////////////////////// int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) { + std::shared_lock lock{m_chunk_mutex}; if (m_journal_chunks.empty()) { return -1; } HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset); @@ -427,7 +430,7 @@ int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_ if (buf == nullptr) { return size_rd; } - auto ec = sync_pread(buf, size_rd, m_seek_cursor); + auto ec = pread_impl(buf, size_rd, m_seek_cursor); // TODO: Check if we can have tolerate this error and somehow start homestore without replaying or in degraded mode? HS_REL_ASSERT(!ec, "Error in reading next stream of bytes, proceeding could cause some inconsistency, exiting"); @@ -442,6 +445,11 @@ int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_ } std::error_code JournalVirtualDev::Descriptor::sync_pread(uint8_t* buf, size_t size, off_t offset) { + std::shared_lock lock{m_chunk_mutex}; + return pread_impl(buf, size, offset); +} + +std::error_code JournalVirtualDev::Descriptor::pread_impl(uint8_t* buf, size_t size, off_t offset) { auto [chunk, index, offset_in_chunk] = offset_to_chunk(offset); // if the read count is acrossing chunk, only return what's left in this chunk @@ -451,11 +459,12 @@ std::error_code JournalVirtualDev::Descriptor::sync_pread(uint8_t* buf, size_t s } LOGTRACEMOD(journalvdev, "offset: 0x{} size: {} chunk: {} index: {} offset_in_chunk: 0x{} desc {}", to_hex(offset), - size, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string()); + size, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string_nolock()); return m_vdev.sync_read(r_cast< char* >(buf), size, chunk, offset_in_chunk); } std::error_code JournalVirtualDev::Descriptor::sync_preadv(iovec* iov, int iovcnt, off_t offset) { + std::shared_lock lock{m_chunk_mutex}; uint64_t len = VirtualDev::get_len(iov, iovcnt); auto [chunk, index, offset_in_chunk] = offset_to_chunk(offset); @@ -471,7 +480,7 @@ std::error_code JournalVirtualDev::Descriptor::sync_preadv(iovec* iov, int iovcn } LOGTRACEMOD(journalvdev, "offset: 0x{} iov: {} len: {} chunk: {} index: {} offset_in_chunk: 0x{} desc {}", - to_hex(offset), iovcnt, len, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string()); + to_hex(offset), iovcnt, len, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string_nolock()); return m_vdev.sync_readv(iov, iovcnt, chunk, offset_in_chunk); } @@ -498,6 +507,7 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) { * @brief :- it returns the vdev offset after nbytes from start offset */ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { + std::shared_lock lock{m_chunk_mutex}; if (nbytes == 0 || m_journal_chunks.empty()) { // If no chunks return start offset. return data_start_offset(); @@ -527,20 +537,26 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const { return vdev_offset; } +void JournalVirtualDev::Descriptor::init_data_start_offset(off_t offset) { + std::unique_lock lock{m_chunk_mutex}; + update_data_start_offset(offset); +} + void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) { // Refactor this code to truncate. if (!m_journal_chunks.empty()) { - m_data_start_offset = offset; - auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + m_data_start_offset.store(offset, std::memory_order_release); + auto data_start_offset_aligned = sisl::round_down(offset, m_vdev.info().chunk_size); m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size; - LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string()); + LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string_nolock()); RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", - to_string()); + to_string_nolock()); } else { // If there are no chunks, we round up to the next chunk size. - m_data_start_offset = sisl::round_up(offset, m_vdev.info().chunk_size); - m_end_offset = m_data_start_offset; - LOGINFOMOD(journalvdev, "No chunks, updated data start offset off 0x{} {}", to_hex(offset), to_string()); + const off_t rounded = sisl::round_up(offset, m_vdev.info().chunk_size); + m_data_start_offset.store(rounded, std::memory_order_release); + m_end_offset = rounded; + LOGINFOMOD(journalvdev, "No chunks, updated data start offset off 0x{} {}", to_hex(offset), to_string_nolock()); } } @@ -567,10 +583,11 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) { } off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { + std::unique_lock lock{m_chunk_mutex}; const off_t ds_off = data_start_offset(); COUNTER_INCREMENT(m_vdev.m_metrics, vdev_truncate_count, 1); HS_PERIODIC_LOG(DEBUG, journalvdev, "truncating to logical offset: 0x{} desc {}", to_hex(truncate_offset), - to_string()); + to_string_nolock()); uint64_t size_to_truncate{0}; if (truncate_offset >= ds_off) { @@ -615,7 +632,7 @@ off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { off_t tail_off = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; auto chunk_size = m_vdev.info().chunk_size; - HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string()); + HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string_nolock()); #ifdef _PRERELEASE for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) { @@ -654,7 +671,7 @@ off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { LOGINFOMOD(journalvdev, "Released chunk_id={} log_dev={} cover={} truncate_offset={} tail={} end_of_chunk={} desc {}", chunk->chunk_id(), m_logdev_id, to_hex(cover_offset), to_hex(truncate_offset), to_hex(tail_off), - m_vdev.get_end_of_chunk(chunk), to_string()); + m_vdev.get_end_of_chunk(chunk), to_string_nolock()); m_vdev.release_chunk_to_pool(chunk); } else { ++it; @@ -682,7 +699,7 @@ off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) { } #endif - HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string()); + HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string_nolock()); return data_start_offset(); } @@ -724,7 +741,7 @@ uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset, std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset, bool check) const { - uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size); + uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset.load(std::memory_order_relaxed), m_vdev.info().chunk_size); uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset}; uint32_t index = 0; for (auto& chunk : m_journal_chunks) { @@ -736,11 +753,12 @@ std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::of } } - if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string()); } + if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string_nolock()); } return {nullptr, 0L, 0L}; } bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) { + std::shared_lock lock{m_chunk_mutex}; auto [chunk, chunk_index, _] = offset_to_chunk(bytes_offset, false); if (chunk == nullptr) return true; if (chunk_index == m_journal_chunks.size() - 1) { return true; } @@ -777,11 +795,11 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const { return (offset_in_chunk + size > chunk->size()); } -nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { +nlohmann::json JournalVirtualDev::Descriptor::get_status_nolock(int log_level) const { nlohmann::json j; j["logdev"] = m_logdev_id; j["seek_cursor"] = m_seek_cursor; - j["data_start_offset"] = m_data_start_offset; + j["data_start_offset"] = m_data_start_offset.load(std::memory_order_relaxed); j["end_offset"] = m_end_offset; j["write_size"] = m_write_sz_in_total.load(std::memory_order_relaxed); j["truncate_done"] = m_truncate_done; @@ -807,17 +825,32 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { return j; } -std::string JournalVirtualDev::Descriptor::to_string() const { +nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const { + std::shared_lock lock{m_chunk_mutex}; + return get_status_nolock(log_level); +} + +std::string JournalVirtualDev::Descriptor::to_string_nolock() const { off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz; std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};" "rsvdsz={};chunks={};trunc={};total={};seek=0x{} ", - m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset), + m_logdev_id, to_hex(m_data_start_offset.load(std::memory_order_relaxed)), to_hex(m_end_offset), m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))}; return str; } +std::string JournalVirtualDev::Descriptor::to_string() const { + std::shared_lock lock{m_chunk_mutex}; + return to_string_nolock(); +} + +uint32_t JournalVirtualDev::Descriptor::num_chunks_used() const { + std::shared_lock lock{m_chunk_mutex}; + return m_journal_chunks.size(); +} + uint64_t JournalVirtualDev::used_size() const { std::lock_guard lock{m_mutex}; uint64_t total_size = 0; diff --git a/src/lib/device/journal_vdev.hpp b/src/lib/device/journal_vdev.hpp index 460db4012..d434ab2df 100644 --- a/src/lib/device/journal_vdev.hpp +++ b/src/lib/device/journal_vdev.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -61,7 +62,7 @@ class JournalVirtualDev : public VirtualDev { // off_t is long. make it uint64_t ? off_t m_seek_cursor{0}; // the seek cursor - off_t m_data_start_offset{0}; // Start offset of where actual data begin for this vdev + std::atomic< off_t > m_data_start_offset{0}; // Start offset of where actual data begin for this vdev std::atomic< uint64_t > m_write_sz_in_total{0}; // Size will be decreased by truncate and increased by append; bool m_truncate_done{false}; uint64_t m_reserved_sz{0}; // write size within chunk, used to check chunk boundary; @@ -69,6 +70,7 @@ class JournalVirtualDev : public VirtualDev { uint64_t m_total_size{0}; // Total size of all chunks. off_t m_end_offset{0}; // Offset right to window. Never reduced. Increased in multiple of chunk size. bool m_end_offset_set{false}; // Adjust the m_end_offset only once during init. + mutable std::shared_mutex m_chunk_mutex; // Protects m_journal_chunks and m_data_start_offset. friend class JournalVirtualDev; public: @@ -233,26 +235,24 @@ class JournalVirtualDev : public VirtualDev { * * @return : the start logical offset where data starts; */ - off_t data_start_offset() const { return m_data_start_offset; } + off_t data_start_offset() const { return m_data_start_offset.load(std::memory_order_acquire); } + + /** + * @brief : Set the data start offset during init/recovery, before concurrent I/O begins. + * Acquires unique_lock on m_chunk_mutex defensively. + */ + void init_data_start_offset(off_t offset); off_t end_offset() const { return m_end_offset; } uint64_t write_sz_in_total() const { return m_write_sz_in_total.load(); } - uint32_t num_chunks_used() const { return m_journal_chunks.size(); } + uint32_t num_chunks_used() const; bool truncate_done() const { return m_truncate_done; } uint64_t reserved_size() const { return m_reserved_sz; } - /** - * @brief : persist start logical offset to vdev's super block - * Supposed to be called when truncate happens; - * - * @param offset : the start logical offset to be persisted - */ - void update_data_start_offset(off_t offset); - /** * @brief : get the logical tail offset; * @@ -393,6 +393,16 @@ class JournalVirtualDev : public VirtualDev { bool is_alloc_accross_chunk(size_t size) const; auto get_dev_details(size_t len, off_t offset); + + // Unlocked read implementation — must be called with m_chunk_mutex held (shared or unique). + std::error_code pread_impl(uint8_t* buf, size_t count_in, off_t offset); + + // Unlocked body of to_string/get_status — must be called with m_chunk_mutex held. + std::string to_string_nolock() const; + nlohmann::json get_status_nolock(int log_level) const; + + // Must only be called from truncate() while m_chunk_mutex unique_lock is held. + void update_data_start_offset(off_t offset); }; /* Create a new virtual dev for these parameters */ diff --git a/src/lib/logstore/log_dev.cpp b/src/lib/logstore/log_dev.cpp index 5a627e645..fbd639b83 100644 --- a/src/lib/logstore/log_dev.cpp +++ b/src/lib/logstore/log_dev.cpp @@ -68,7 +68,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { if (format) { HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present"); m_logdev_meta.create(m_logdev_id, m_flush_mode, m_parent_id); - m_vdev_jd->update_data_start_offset(0); + m_vdev_jd->init_data_start_offset(0); } else { HS_LOG_ASSERT(!m_logdev_meta.is_empty(), "Expected meta data to be read already before loading this log dev id: {}", m_logdev_id); @@ -82,7 +82,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) { THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ", m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx()); - m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset()); + m_vdev_jd->init_data_start_offset(m_logdev_meta.get_start_dev_offset()); m_log_idx = m_logdev_meta.get_start_log_idx(); do_load(m_logdev_meta.get_start_dev_offset()); m_log_records->reinit(m_log_idx); diff --git a/src/tests/test_journal_vdev.cpp b/src/tests/test_journal_vdev.cpp index 8a06911cf..657c05221 100644 --- a/src/tests/test_journal_vdev.cpp +++ b/src/tests/test_journal_vdev.cpp @@ -116,7 +116,7 @@ class JournalDescriptorTest { void restore() { auto vdev = hs()->logstore_service().get_vdev(); m_vdev_jd = vdev->open(m_logdev_id); - m_vdev_jd->update_data_start_offset(last_start_offset); + m_vdev_jd->init_data_start_offset(last_start_offset); m_vdev_jd->update_tail_offset(last_tail_offset); }