Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/car-index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn bench_car_index(c: &mut Criterion) {
block_on(
index::Builder::from_iter(reference.clone())
.into_writer()
.write_into(&mut v),
.write_zstd_skip_frames_into(&mut v),
)
.unwrap();
index::Reader::new(v).unwrap()
Expand Down
10 changes: 5 additions & 5 deletions src/db/car/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Discard reader type and replace with dynamic trait object.
pub fn into_dyn(self) -> AnyCar<Box<dyn super::RandomAccessFileReader>> {
match self {
AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()),
pub fn into_dyn(self) -> Result<AnyCar<Box<dyn super::RandomAccessFileReader>>> {
Ok(match self {
AnyCar::Forest(f) => AnyCar::Forest(f.into_dyn()?),
AnyCar::Plain(p) => AnyCar::Plain(p.into_dyn()),
AnyCar::Memory(m) => AnyCar::Memory(m),
}
})
}

/// Set the z-frame cache of the inner CAR reader.
Expand All @@ -109,7 +109,7 @@ impl<ReaderT: RandomAccessFileReader> AnyCar<ReaderT> {
}

/// Get the index size in bytes
pub fn index_size_bytes(&self) -> Option<u32> {
pub fn index_size_bytes(&self) -> Option<u64> {
match self {
Self::Forest(car) => Some(car.index_size_bytes()),
_ => None,
Expand Down
60 changes: 30 additions & 30 deletions src/db/car/forest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,19 @@ use super::{CacheKey, ZstdFrameCache};
use crate::blocks::{Tipset, TipsetKey};
use crate::chain::FilecoinSnapshotMetadata;
use crate::db::car::RandomAccessFileReader;
use crate::db::car::plain::write_skip_frame_header_async;
use crate::db::car::forest::index::ZstdSkipFramesEncodedDataReader;
use crate::utils::db::car_stream::{CarBlock, CarV1Header, uvi_bytes};
use crate::utils::encoding::from_slice_with_fallback;
use crate::utils::get_size::CidWrapper;
use crate::utils::io::EitherMmapOrRandomAccessFile;
use byteorder::LittleEndian;
use bytes::{BufMut as _, Bytes, BytesMut, buf::Writer};
use cid::Cid;
use futures::{Stream, TryStreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::CborStore as _;
use integer_encoding::VarIntReader;
use nunny::Vec as NonEmpty;
use positioned_io::{Cursor, ReadAt, ReadBytesAtExt, SizeCursor};
use positioned_io::{Cursor, ReadAt, SizeCursor};
use std::io::{Seek, SeekFrom};
use std::path::Path;
use std::sync::{Arc, OnceLock};
Expand All @@ -86,7 +85,7 @@ pub const TEMP_FOREST_CAR_FILE_EXTENSION: &str = ".forest.car.zst.tmp";
pub const ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER: [u8; 4] = [0x50, 0x2A, 0x4D, 0x18];
pub const DEFAULT_FOREST_CAR_FRAME_SIZE: usize = 8000_usize.next_power_of_two();
pub const DEFAULT_FOREST_CAR_COMPRESSION_LEVEL: u16 = zstd::DEFAULT_COMPRESSION_LEVEL as _;
const ZSTD_SKIP_FRAME_LEN: u64 = 8;
pub const ZSTD_SKIP_FRAME_LEN: u64 = 8;

/// `zstd` frame of Forest CAR
pub type ForestCarFrame = (Vec<Cid>, Bytes);
Expand All @@ -95,24 +94,19 @@ pub struct ForestCar<ReaderT> {
// Multiple `ForestCar` structures may share the same cache. The cache key is used to identify
// the origin of a cached z-frame.
cache_key: CacheKey,
indexed: index::Reader<positioned_io::Slice<ReaderT>>,
index_size_bytes: u32,
indexed: index::Reader<index::ZstdSkipFramesEncodedDataReader<positioned_io::Slice<ReaderT>>>,
index_size_bytes: u64,
frame_cache: Arc<ZstdFrameCache>,
header: CarV1Header,
metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
}

impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
pub fn new(reader: ReaderT) -> io::Result<ForestCar<ReaderT>> {
let (header, footer) = Self::validate_car(&reader)?;
let index_size_bytes = reader.read_u32_at::<LittleEndian>(
footer.index.saturating_sub(std::mem::size_of::<u32>() as _),
)?;
let indexed = index::Reader::new(positioned_io::Slice::new(
reader,
footer.index,
Some(index_size_bytes as u64),
))?;
let (header, index_start_pos, index_size_bytes) = Self::validate_car(&reader)?;
let indexed = index::Reader::new(index::ZstdSkipFramesEncodedDataReader::new(
positioned_io::Slice::new(reader, index_start_pos, Some(index_size_bytes)),
)?)?;
Ok(ForestCar {
cache_key: 0,
indexed,
Expand Down Expand Up @@ -141,9 +135,10 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
Self::validate_car(reader).is_ok()
}

fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, ForestCarFooter)> {
fn validate_car(reader: &ReaderT) -> io::Result<(CarV1Header, u64, u64)> {
let mut cursor = SizeCursor::new(&reader);
cursor.seek(SeekFrom::End(-(ForestCarFooter::SIZE as i64)))?;
let index_end_pos = cursor.position();

let mut footer_buffer = [0; ForestCarFooter::SIZE];
cursor.read_exact(&mut footer_buffer)?;
Expand All @@ -153,6 +148,12 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
"not recognizable as a `{FOREST_CAR_FILE_EXTENSION}` file"
))
})?;
let index_start_pos = footer.index.checked_sub(ZSTD_SKIP_FRAME_LEN).ok_or_else(||
invalid_data(format!(
"unexpected error: footer.index({}) < ZSTD_SKIP_FRAME_LEN({ZSTD_SKIP_FRAME_LEN})",
footer.index
)),
)?;

let cursor = Cursor::new_pos(&reader, 0);
let mut header_zstd_frame = decode_zstd_single_frame(cursor)?.into();
Expand All @@ -162,7 +163,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
let header = from_slice_with_fallback::<CarV1Header>(&block_frame)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

Ok((header, footer))
Ok((header, index_start_pos, index_end_pos - index_start_pos))
}

pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
Expand All @@ -175,7 +176,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
}
}

pub fn index_size_bytes(&self) -> u32 {
pub fn index_size_bytes(&self) -> u64 {
self.index_size_bytes
}

Expand All @@ -187,22 +188,22 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
Tipset::load_required(self, &self.heaviest_tipset_key())
}

pub fn into_dyn(self) -> ForestCar<Box<dyn super::RandomAccessFileReader>> {
ForestCar {
pub fn into_dyn(self) -> io::Result<ForestCar<Box<dyn super::RandomAccessFileReader>>> {
Ok(ForestCar {
cache_key: self.cache_key,
indexed: self.indexed.map(|slice| {
let offset = slice.offset();
positioned_io::Slice::new(
Box::new(slice.into_inner()) as Box<dyn RandomAccessFileReader>,
let offset = slice.inner().offset();
ZstdSkipFramesEncodedDataReader::new(positioned_io::Slice::new(
Box::new(slice.into_inner().into_inner()) as Box<dyn RandomAccessFileReader>,
offset,
None,
)
}),
))
})?,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
index_size_bytes: self.index_size_bytes,
frame_cache: self.frame_cache,
header: self.header,
metadata: self.metadata,
}
})
}

pub fn with_cache(self, cache: Arc<ZstdFrameCache>, key: CacheKey) -> Self {
Expand All @@ -217,7 +218,7 @@ impl<ReaderT: super::RandomAccessFileReader> ForestCar<ReaderT> {
pub fn get_reader(&self, k: Cid) -> anyhow::Result<Option<impl Read>> {
for position in self.indexed.get(k)? {
// escape the positioned_io::Slice
let entire_file = self.indexed.reader().get_ref();
let entire_file = self.indexed.reader().inner().get_ref();
// `position` is the frame start offset.
let cursor = Cursor::new_pos(entire_file, position);
let mut decoder = zstd::Decoder::new(cursor)?.single_frame();
Expand Down Expand Up @@ -259,7 +260,7 @@ where
Some(None) => {}
None => {
// Decode entire frame into memory, "position" arg is the frame start offset.
let entire_file = indexed.reader().get_ref(); // escape the positioned_io::Slice
let entire_file = indexed.reader().inner().get_ref(); // escape the positioned_io::Slice
let cursor = Cursor::new_pos(entire_file, position);
let mut zstd_frame = decode_zstd_single_frame(cursor)?.into();
// Parse all key-value pairs and insert them into a map
Expand Down Expand Up @@ -332,8 +333,7 @@ impl Encoder {

// Create index
let writer = builder.into_writer();
write_skip_frame_header_async(&mut sink, writer.written_len().try_into().unwrap()).await?;
writer.write_into(&mut sink).await?;
writer.write_zstd_skip_frames_into(&mut sink).await?;

// Write ForestCAR.zst footer, it's a valid ZSTD skip-frame
let footer = ForestCarFooter {
Expand Down
Loading
Loading