Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 56 additions & 23 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ void JournalVirtualDev::destroy(logdev_id_t logdev_id) {
}

void JournalVirtualDev::Descriptor::append_chunk() {
std::unique_lock lock{m_chunk_mutex};
// Get a new chunk from the pool.
auto new_chunk = m_vdev.m_chunk_pool->dequeue();
#if 0
Expand Down Expand Up @@ -238,7 +239,7 @@ void JournalVirtualDev::Descriptor::append_chunk() {
}
m_vdev.update_chunk_private(last_chunk, last_chunk_private);
LOGINFOMOD(journalvdev, "Added chunk new {} last {} desc {}", new_chunk->to_string(), last_chunk->chunk_id(),
to_string());
to_string_nolock());

} else {
// If the list is empty, update the new chunk as the head. Only head chunk contains the logdev_id.
Expand All @@ -249,7 +250,7 @@ void JournalVirtualDev::Descriptor::append_chunk() {
// Append the new chunk
m_journal_chunks.push_back(new_chunk);
m_vdev.update_chunk_private(new_chunk, new_chunk_private);
LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string());
LOGINFOMOD(journalvdev, "Added head chunk={} desc {}", new_chunk->to_string(), to_string_nolock());
}
}

Expand Down Expand Up @@ -309,12 +310,13 @@ bool JournalVirtualDev::Descriptor::validate_append_size(size_t req_sz) const {
}

auto JournalVirtualDev::Descriptor::process_pwrite_offset(size_t len, off_t offset) {
std::shared_lock lock{m_chunk_mutex};
// convert logical offset to chunk and its offset
auto const chunk_details = offset_to_chunk(offset);
auto const [chunk, _, offset_in_chunk] = chunk_details;

LOGTRACEMOD(journalvdev, "writing in chunk: {}, offset: 0x{} len: {} offset_in_chunk: 0x{} chunk_sz: {} desc {}",
chunk->chunk_id(), to_hex(offset), len, to_hex(offset_in_chunk), chunk->size(), to_string());
chunk->chunk_id(), to_hex(offset), len, to_hex(offset_in_chunk), chunk->size(), to_string_nolock());

// this assert only valid for pwrite/pwritev, which calls alloc_next_append_blk to get the offset to do the
// write, which guarantees write will with the returned offset will not accross chunk boundary.
Expand Down Expand Up @@ -393,6 +395,7 @@ std::error_code JournalVirtualDev::Descriptor::sync_pwritev(const iovec* iov, in

/////////////////////////////// Read Section //////////////////////////////////
int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_rd) {
std::shared_lock lock{m_chunk_mutex};
if (m_journal_chunks.empty()) { return -1; }

HS_REL_ASSERT_LE(m_seek_cursor, m_end_offset, "seek_cursor {} exceeded end_offset {}", m_seek_cursor, m_end_offset);
Expand Down Expand Up @@ -427,7 +430,7 @@ int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_

if (buf == nullptr) { return size_rd; }

auto ec = sync_pread(buf, size_rd, m_seek_cursor);
auto ec = pread_impl(buf, size_rd, m_seek_cursor);
// TODO: Check if we can have tolerate this error and somehow start homestore without replaying or in degraded mode?
HS_REL_ASSERT(!ec, "Error in reading next stream of bytes, proceeding could cause some inconsistency, exiting");

Expand All @@ -442,6 +445,11 @@ int64_t JournalVirtualDev::Descriptor::sync_next_read(uint8_t* buf, size_t size_
}

std::error_code JournalVirtualDev::Descriptor::sync_pread(uint8_t* buf, size_t size, off_t offset) {
std::shared_lock lock{m_chunk_mutex};
return pread_impl(buf, size, offset);
}

std::error_code JournalVirtualDev::Descriptor::pread_impl(uint8_t* buf, size_t size, off_t offset) {
auto [chunk, index, offset_in_chunk] = offset_to_chunk(offset);

// if the read count is acrossing chunk, only return what's left in this chunk
Expand All @@ -451,11 +459,12 @@ std::error_code JournalVirtualDev::Descriptor::sync_pread(uint8_t* buf, size_t s
}

LOGTRACEMOD(journalvdev, "offset: 0x{} size: {} chunk: {} index: {} offset_in_chunk: 0x{} desc {}", to_hex(offset),
size, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string());
size, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string_nolock());
return m_vdev.sync_read(r_cast< char* >(buf), size, chunk, offset_in_chunk);
}

std::error_code JournalVirtualDev::Descriptor::sync_preadv(iovec* iov, int iovcnt, off_t offset) {
std::shared_lock lock{m_chunk_mutex};
uint64_t len = VirtualDev::get_len(iov, iovcnt);
auto [chunk, index, offset_in_chunk] = offset_to_chunk(offset);

Expand All @@ -471,7 +480,7 @@ std::error_code JournalVirtualDev::Descriptor::sync_preadv(iovec* iov, int iovcn
}

LOGTRACEMOD(journalvdev, "offset: 0x{} iov: {} len: {} chunk: {} index: {} offset_in_chunk: 0x{} desc {}",
to_hex(offset), iovcnt, len, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string());
to_hex(offset), iovcnt, len, chunk->chunk_id(), index, to_hex(offset_in_chunk), to_string_nolock());

return m_vdev.sync_readv(iov, iovcnt, chunk, offset_in_chunk);
}
Expand All @@ -498,6 +507,7 @@ off_t JournalVirtualDev::Descriptor::lseek(off_t offset, int whence) {
* @brief :- it returns the vdev offset after nbytes from start offset
*/
off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
std::shared_lock lock{m_chunk_mutex};
if (nbytes == 0 || m_journal_chunks.empty()) {
// If no chunks return start offset.
return data_start_offset();
Expand Down Expand Up @@ -527,20 +537,26 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
return vdev_offset;
}

void JournalVirtualDev::Descriptor::init_data_start_offset(off_t offset) {
std::unique_lock lock{m_chunk_mutex};
update_data_start_offset(offset);
}

void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) {
// Refactor this code to truncate.
if (!m_journal_chunks.empty()) {
m_data_start_offset = offset;
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
m_data_start_offset.store(offset, std::memory_order_release);
auto data_start_offset_aligned = sisl::round_down(offset, m_vdev.info().chunk_size);
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string());
LOGINFOMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string_nolock());
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}",
to_string());
to_string_nolock());
} else {
// If there are no chunks, we round up to the next chunk size.
m_data_start_offset = sisl::round_up(offset, m_vdev.info().chunk_size);
m_end_offset = m_data_start_offset;
LOGINFOMOD(journalvdev, "No chunks, updated data start offset off 0x{} {}", to_hex(offset), to_string());
const off_t rounded = sisl::round_up(offset, m_vdev.info().chunk_size);
m_data_start_offset.store(rounded, std::memory_order_release);
m_end_offset = rounded;
LOGINFOMOD(journalvdev, "No chunks, updated data start offset off 0x{} {}", to_hex(offset), to_string_nolock());
}
}

Expand All @@ -567,10 +583,11 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) {
}

off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
std::unique_lock lock{m_chunk_mutex};
const off_t ds_off = data_start_offset();
COUNTER_INCREMENT(m_vdev.m_metrics, vdev_truncate_count, 1);
HS_PERIODIC_LOG(DEBUG, journalvdev, "truncating to logical offset: 0x{} desc {}", to_hex(truncate_offset),
to_string());
to_string_nolock());

uint64_t size_to_truncate{0};
if (truncate_offset >= ds_off) {
Expand Down Expand Up @@ -615,7 +632,7 @@ off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
off_t tail_off =
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
auto chunk_size = m_vdev.info().chunk_size;
HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string());
HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate begin truncate {} desc {}", to_hex(truncate_offset), to_string_nolock());

#ifdef _PRERELEASE
for (auto it = m_journal_chunks.begin(); it != m_journal_chunks.end(); ++it) {
Expand Down Expand Up @@ -654,7 +671,7 @@ off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
LOGINFOMOD(journalvdev,
"Released chunk_id={} log_dev={} cover={} truncate_offset={} tail={} end_of_chunk={} desc {}",
chunk->chunk_id(), m_logdev_id, to_hex(cover_offset), to_hex(truncate_offset), to_hex(tail_off),
m_vdev.get_end_of_chunk(chunk), to_string());
m_vdev.get_end_of_chunk(chunk), to_string_nolock());
m_vdev.release_chunk_to_pool(chunk);
} else {
++it;
Expand Down Expand Up @@ -682,7 +699,7 @@ off_t JournalVirtualDev::Descriptor::truncate(off_t truncate_offset) {
}
#endif

HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string());
HS_PERIODIC_LOG(DEBUG, journalvdev, "Truncate end truncate {} desc {}", to_hex(truncate_offset), to_string_nolock());
return data_start_offset();
}

Expand Down Expand Up @@ -724,7 +741,7 @@ uint64_t JournalVirtualDev::Descriptor::logical_to_dev_offset(off_t log_offset,

std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::offset_to_chunk(off_t log_offset,
bool check) const {
uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
uint64_t chunk_aligned_offset = sisl::round_down(m_data_start_offset.load(std::memory_order_relaxed), m_vdev.info().chunk_size);
uint64_t off_l{static_cast< uint64_t >(log_offset) - chunk_aligned_offset};
uint32_t index = 0;
for (auto& chunk : m_journal_chunks) {
Expand All @@ -736,11 +753,12 @@ std::tuple< shared< Chunk >, uint32_t, off_t > JournalVirtualDev::Descriptor::of
}
}

if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string()); }
if (check) { HS_DBG_ASSERT(false, "Input log_offset is invalid: {} {}", log_offset, to_string_nolock()); }
return {nullptr, 0L, 0L};
}

bool JournalVirtualDev::Descriptor::is_offset_at_last_chunk(off_t bytes_offset) {
std::shared_lock lock{m_chunk_mutex};
auto [chunk, chunk_index, _] = offset_to_chunk(bytes_offset, false);
if (chunk == nullptr) return true;
if (chunk_index == m_journal_chunks.size() - 1) { return true; }
Expand Down Expand Up @@ -777,11 +795,11 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const {
return (offset_in_chunk + size > chunk->size());
}

nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json JournalVirtualDev::Descriptor::get_status_nolock(int log_level) const {
nlohmann::json j;
j["logdev"] = m_logdev_id;
j["seek_cursor"] = m_seek_cursor;
j["data_start_offset"] = m_data_start_offset;
j["data_start_offset"] = m_data_start_offset.load(std::memory_order_relaxed);
j["end_offset"] = m_end_offset;
j["write_size"] = m_write_sz_in_total.load(std::memory_order_relaxed);
j["truncate_done"] = m_truncate_done;
Expand All @@ -807,17 +825,32 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
return j;
}

std::string JournalVirtualDev::Descriptor::to_string() const {
nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
std::shared_lock lock{m_chunk_mutex};
return get_status_nolock(log_level);
}

std::string JournalVirtualDev::Descriptor::to_string_nolock() const {
off_t tail =
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};"
"rsvdsz={};chunks={};trunc={};total={};seek=0x{} ",
m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset),
m_logdev_id, to_hex(m_data_start_offset.load(std::memory_order_relaxed)), to_hex(m_end_offset),
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz,
m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))};
return str;
}

std::string JournalVirtualDev::Descriptor::to_string() const {
std::shared_lock lock{m_chunk_mutex};
return to_string_nolock();
}

uint32_t JournalVirtualDev::Descriptor::num_chunks_used() const {
std::shared_lock lock{m_chunk_mutex};
return m_journal_chunks.size();
}

uint64_t JournalVirtualDev::used_size() const {
std::lock_guard lock{m_mutex};
uint64_t total_size = 0;
Expand Down
32 changes: 21 additions & 11 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <atomic>
#include <functional>
#include <memory>
#include <shared_mutex>
#include <vector>
#include <condition_variable>

Expand Down Expand Up @@ -61,14 +62,15 @@ class JournalVirtualDev : public VirtualDev {
// off_t is long. make it uint64_t ?
off_t m_seek_cursor{0}; // the seek cursor

off_t m_data_start_offset{0}; // Start offset of where actual data begin for this vdev
std::atomic< off_t > m_data_start_offset{0}; // Start offset of where actual data begin for this vdev
std::atomic< uint64_t > m_write_sz_in_total{0}; // Size will be decreased by truncate and increased by append;
bool m_truncate_done{false};
uint64_t m_reserved_sz{0}; // write size within chunk, used to check chunk boundary;
std::vector< shared< Chunk > > m_journal_chunks; // Chunks part of this journal in order.
uint64_t m_total_size{0}; // Total size of all chunks.
off_t m_end_offset{0}; // Offset right to window. Never reduced. Increased in multiple of chunk size.
bool m_end_offset_set{false}; // Adjust the m_end_offset only once during init.
mutable std::shared_mutex m_chunk_mutex; // Protects m_journal_chunks and m_data_start_offset.
friend class JournalVirtualDev;

public:
Expand Down Expand Up @@ -233,26 +235,24 @@ class JournalVirtualDev : public VirtualDev {
*
* @return : the start logical offset where data starts;
*/
off_t data_start_offset() const { return m_data_start_offset; }
off_t data_start_offset() const { return m_data_start_offset.load(std::memory_order_acquire); }

/**
* @brief : Set the data start offset during init/recovery, before concurrent I/O begins.
* Acquires unique_lock on m_chunk_mutex defensively.
*/
void init_data_start_offset(off_t offset);

off_t end_offset() const { return m_end_offset; }

uint64_t write_sz_in_total() const { return m_write_sz_in_total.load(); }

uint32_t num_chunks_used() const { return m_journal_chunks.size(); }
uint32_t num_chunks_used() const;

bool truncate_done() const { return m_truncate_done; }

uint64_t reserved_size() const { return m_reserved_sz; }

/**
* @brief : persist start logical offset to vdev's super block
* Supposed to be called when truncate happens;
*
* @param offset : the start logical offset to be persisted
*/
void update_data_start_offset(off_t offset);

/**
* @brief : get the logical tail offset;
*
Expand Down Expand Up @@ -393,6 +393,16 @@ class JournalVirtualDev : public VirtualDev {
bool is_alloc_accross_chunk(size_t size) const;

auto get_dev_details(size_t len, off_t offset);

// Unlocked read implementation — must be called with m_chunk_mutex held (shared or unique).
std::error_code pread_impl(uint8_t* buf, size_t count_in, off_t offset);

// Unlocked body of to_string/get_status — must be called with m_chunk_mutex held.
std::string to_string_nolock() const;
nlohmann::json get_status_nolock(int log_level) const;

// Must only be called from truncate() while m_chunk_mutex unique_lock is held.
void update_data_start_offset(off_t offset);
};

/* Create a new virtual dev for these parameters */
Expand Down
4 changes: 2 additions & 2 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) {
if (format) {
HS_LOG_ASSERT(m_logdev_meta.is_empty(), "Expected meta to be not present");
m_logdev_meta.create(m_logdev_id, m_flush_mode, m_parent_id);
m_vdev_jd->update_data_start_offset(0);
m_vdev_jd->init_data_start_offset(0);
} else {
HS_LOG_ASSERT(!m_logdev_meta.is_empty(),
"Expected meta data to be read already before loading this log dev id: {}", m_logdev_id);
Expand All @@ -82,7 +82,7 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) {
THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ",
m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx());

m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset());
m_vdev_jd->init_data_start_offset(m_logdev_meta.get_start_dev_offset());
m_log_idx = m_logdev_meta.get_start_log_idx();
do_load(m_logdev_meta.get_start_dev_offset());
m_log_records->reinit(m_log_idx);
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class JournalDescriptorTest {
void restore() {
auto vdev = hs()->logstore_service().get_vdev();
m_vdev_jd = vdev->open(m_logdev_id);
m_vdev_jd->update_data_start_offset(last_start_offset);
m_vdev_jd->init_data_start_offset(last_start_offset);
m_vdev_jd->update_tail_offset(last_tail_offset);
}

Expand Down