Skip to content

Commit f36cdaf

Browse files
committed
add read implementation instead of function
1 parent 142f9a3 commit f36cdaf

3 files changed

Lines changed: 179 additions & 82 deletions

File tree

cli/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use libribzip2::stream::{decode_stream, encode_stream};
1+
use libribzip2::stream::{decode_stream, Encoder};
22
use libribzip2::EncodingStrategy;
33
use num_cpus;
44
use std::fmt;
@@ -119,7 +119,8 @@ fn try_main(opt: Opt) -> Result<(), FileError> {
119119
},
120120
};
121121
let threads_val = threads.unwrap_or(num_cpus::get());
122-
encode_stream(&mut in_file, &mut out_file, threads_val, encoding_strategy);
122+
let mut encoder = Encoder::new(in_file, encoding_strategy, threads_val);
123+
std::io::copy(&mut encoder, &mut out_file).unwrap();
123124
}
124125
}
125126
}

lib/src/bitwise/bitwriter.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,58 @@ pub fn increment_symbol(input: Vec<Bit>) -> Vec<Bit> {
144144
let len = input.len();
145145
convert_to_code_pad_to_n_bits(convert_to_number(&input) + 1, len)
146146
}
147+
148+
#[derive(Default)]
149+
pub struct BufferBitWriter {
150+
pending_bits: Vec<Bit>,
151+
buffer: Vec<u8>,
152+
}
153+
154+
impl BufferBitWriter {
155+
pub fn pull(&mut self, limit: usize) -> Vec<u8> {
156+
let how_many = if limit > self.buffer.len() {
157+
self.buffer.len()
158+
} else {
159+
limit
160+
};
161+
let out = self.buffer[0..how_many].to_vec();
162+
let out2 = self.buffer[how_many..self.buffer.len()].to_vec();
163+
self.buffer = out2;
164+
165+
out
166+
}
167+
168+
pub fn content(&self) -> usize {
169+
self.buffer.len()
170+
}
171+
}
172+
173+
impl BitWriter for BufferBitWriter {
174+
fn write_bits(&mut self, bits_to_write: &[Bit]) -> Result<(), ()> {
175+
self.pending_bits.append(&mut bits_to_write.to_vec());
176+
let mut chunks = self.pending_bits.chunks_exact(8);
177+
for chunk in &mut chunks {
178+
let number = convert_to_number(chunk);
179+
self.buffer.push(number as u8);
180+
}
181+
self.pending_bits = chunks.remainder().to_vec();
182+
Ok(())
183+
}
184+
185+
fn finalize(&mut self) -> Result<(), ()> {
186+
if self.pending_bits.is_empty() {
187+
return Ok(());
188+
}
189+
let mut trailing_zeros = vec![Bit::Zero; 8 - self.pending_bits.len()];
190+
self.pending_bits.append(&mut trailing_zeros);
191+
let byte = convert_to_number(&self.pending_bits);
192+
self.buffer.push(byte as u8);
193+
self.pending_bits.clear();
194+
195+
Ok(())
196+
}
197+
}
198+
147199
#[cfg(test)]
148200
mod test {
149201
use super::*;

lib/src/stream/mod.rs

Lines changed: 124 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::bitwise::bitreader::BitReaderImpl;
22
use crate::bitwise::bitwriter::convert_to_code_pad_to_byte;
3+
use crate::bitwise::bitwriter::BufferBitWriter;
34

45
use crate::block::block_encoder::crc_as_bytes;
56

@@ -12,7 +13,6 @@ use std::thread;
1213

1314
use crate::bitwise::bitreader::BitReader;
1415
use crate::bitwise::bitwriter::BitWriter;
15-
use crate::bitwise::bitwriter::BitWriterImpl;
1616
use crate::block::block_decoder::decode_block;
1717
use crate::block::block_encoder::generate_block_data;
1818
use crate::block::crc32::crc32;
@@ -24,6 +24,129 @@ use super::block::rle::rle_augment;
2424
use super::block::rle::rle_total_size;
2525
use super::block::symbol_statistics::EncodingStrategy;
2626

27+
/// Encoder to bzip2 encode a stream.
28+
/// ```rust
29+
/// let num_threads = 4;
30+
/// let mut encoder = Encoder::new(in_file, encoding_strategy, num_threads);
31+
/// std::io::copy(&mut encoder, &mut out_file)?;
32+
/// ```
33+
pub struct Encoder<T: Read> {
34+
reader: T,
35+
num_threads: usize,
36+
encoding_strategy: EncodingStrategy,
37+
bit_writer: BufferBitWriter,
38+
total_crc: u32,
39+
finalized: bool,
40+
encoded: bool,
41+
initialized: bool,
42+
}
43+
44+
impl<T> Encoder<T>
45+
where
46+
T: Read,
47+
{
48+
pub fn new(reader: T, encoding_strategy: EncodingStrategy, num_threads: usize) -> Self {
49+
Encoder {
50+
reader,
51+
num_threads,
52+
encoding_strategy,
53+
bit_writer: Default::default(),
54+
total_crc: 0,
55+
finalized: false,
56+
encoded: false,
57+
initialized: false,
58+
}
59+
}
60+
}
61+
62+
impl<T> Read for Encoder<T>
63+
where
64+
T: Read,
65+
{
66+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
67+
const RLE_LIMIT: usize = 900_000;
68+
69+
let mut worker_threads = (0..self.num_threads)
70+
.map(|num| WorkerThread::spawn(&format!("Thread {}", num), self.encoding_strategy))
71+
.collect::<Vec<_>>();
72+
73+
if !self.initialized {
74+
self.bit_writer.write_bits(&file_header()).unwrap();
75+
self.initialized = true;
76+
}
77+
78+
while !self.finalized && !self.encoded {
79+
if self.bit_writer.content() > buf.len() {
80+
break;
81+
}
82+
for worker_thread in worker_threads.iter_mut() {
83+
let mut buf = vec![];
84+
let mut rle_data = vec![];
85+
let mut rle_total_count = 0;
86+
let mut rle_count = 0;
87+
let mut rle_last_char = None;
88+
while rle_total_count < RLE_LIMIT {
89+
// RLE can blow up 4chars to 5, hence we keep a safety margin
90+
let to_take = (RLE_LIMIT - rle_data.len()) * 4 / 5;
91+
let mut buf_current = vec![];
92+
if let Ok(size) = self
93+
.reader
94+
.by_ref()
95+
.take(to_take as u64)
96+
.read_to_end(&mut buf_current)
97+
{
98+
if size == 0 {
99+
self.finalized = true;
100+
break;
101+
}
102+
} else {
103+
break;
104+
}
105+
let rle_result = rle(&buf_current, rle_count, rle_last_char);
106+
let mut rle_next = rle_result.data;
107+
let rle_next_count = rle_result.counter;
108+
let rle_next_char = rle_result.last_byte;
109+
110+
let next_data_len = rle_data.len() + rle_next.len();
111+
rle_total_count = rle_total_size(next_data_len, rle_next_count, rle_next_char);
112+
113+
rle_data.append(&mut rle_next);
114+
buf.append(&mut buf_current);
115+
rle_count = rle_next_count;
116+
rle_last_char = rle_next_char;
117+
}
118+
119+
if buf.len() == 0 {
120+
break;
121+
}
122+
123+
let rle_total = rle_augment(&rle_data, rle_count, rle_last_char);
124+
let computed_crc = crc32(&buf);
125+
worker_thread.send_work((computed_crc, rle_total));
126+
}
127+
128+
for worker_thread in worker_threads.iter_mut() {
129+
if worker_thread.pending {
130+
worker_thread.flush_work_buffer(&mut self.bit_writer, &mut self.total_crc);
131+
}
132+
}
133+
}
134+
135+
if self.finalized && !self.encoded {
136+
self.bit_writer
137+
.write_bits(&stream_footer(self.total_crc))
138+
.unwrap();
139+
self.bit_writer.finalize().unwrap();
140+
self.encoded = true;
141+
}
142+
143+
let res = self.bit_writer.pull(buf.len());
144+
buf[0..res.len()].copy_from_slice(&res);
145+
146+
Ok(res.len())
147+
}
148+
}
149+
27150
fn stream_footer(crc: u32) -> Vec<Bit> {
28151
let mut out = vec![];
29152

@@ -100,85 +223,6 @@ impl WorkerThread {
100223
}
101224
}
102225

103-
/// Encode a stream into a writer. Takes a reader and a writer (i.e. two instances of [std::fs::File]).
104-
/// The number of threads and the encoding strategy can be specified.
105-
pub fn encode_stream(
106-
mut read: impl Read,
107-
mut writer: impl Write,
108-
num_threads: usize,
109-
encoding_strategy: EncodingStrategy,
110-
) {
111-
let mut bit_writer = BitWriterImpl::from_writer(&mut writer);
112-
const RLE_LIMIT: usize = 900_000;
113-
let mut total_crc: u32 = 0;
114-
115-
let mut worker_threads = (0..num_threads)
116-
.map(|num| WorkerThread::spawn(&format!("Thread {}", num), encoding_strategy))
117-
.collect::<Vec<_>>();
118-
119-
bit_writer.write_bits(&file_header()).unwrap();
120-
121-
let mut finalize = false;
122-
loop {
123-
if finalize {
124-
break;
125-
}
126-
for worker_thread in worker_threads.iter_mut() {
127-
let mut buf = vec![];
128-
let mut rle_data = vec![];
129-
let mut rle_total_count = 0;
130-
let mut rle_count = 0;
131-
let mut rle_last_char = None;
132-
while rle_total_count < RLE_LIMIT {
133-
// RLE can blow up 4chars to 5, hence we keep a safety margin
134-
let to_take = (RLE_LIMIT - rle_data.len()) * 4 / 5;
135-
let mut buf_current = vec![];
136-
if let Ok(size) = read
137-
.by_ref()
138-
.take(to_take as u64)
139-
.read_to_end(&mut buf_current)
140-
{
141-
if size == 0 {
142-
finalize = true;
143-
break;
144-
}
145-
} else {
146-
break;
147-
}
148-
let rle_result = rle(&buf_current, rle_count, rle_last_char);
149-
let mut rle_next = rle_result.data;
150-
let rle_next_count = rle_result.counter;
151-
let rle_next_char = rle_result.last_byte;
152-
153-
let next_data_len = rle_data.len() + rle_next.len();
154-
rle_total_count = rle_total_size(next_data_len, rle_next_count, rle_next_char);
155-
156-
rle_data.append(&mut rle_next);
157-
buf.append(&mut buf_current);
158-
rle_count = rle_next_count;
159-
rle_last_char = rle_next_char;
160-
}
161-
162-
if buf.len() == 0 {
163-
break;
164-
}
165-
166-
let rle_total = rle_augment(&rle_data, rle_count, rle_last_char);
167-
let computed_crc = crc32(&buf);
168-
worker_thread.send_work((computed_crc, rle_total));
169-
}
170-
171-
for worker_thread in worker_threads.iter_mut() {
172-
if worker_thread.pending {
173-
worker_thread.flush_work_buffer(&mut bit_writer, &mut total_crc);
174-
}
175-
}
176-
}
177-
178-
bit_writer.write_bits(&stream_footer(total_crc)).unwrap();
179-
bit_writer.finalize().unwrap();
180-
}
181-
182226
fn read_file_header(mut bit_reader: impl BitReader) -> Result<(), ()> {
183227
let res = bit_reader.read_bytes(4)?;
184228
match &res[..] {

0 commit comments

Comments
 (0)