Skip to content

Commit e5ef168

Browse files
committed
Using an unrolled linked list to halve memory usage
1 parent 187486f commit e5ef168

10 files changed

Lines changed: 606 additions & 174 deletions

File tree

src/lib.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::borrow::Cow;
2+
use std::sync::Arc;
23

34
mod block_read_write;
45

@@ -28,6 +29,56 @@ impl<'a> Record<'a> {
2829
}
2930
}
3031

32+
#[derive(Clone, Default, Debug, Ord, PartialOrd, Eq, PartialEq)]
33+
pub struct FileNumber {
34+
file_number: Arc<u64>,
35+
}
36+
37+
impl FileNumber {
38+
fn new(file_number: u64) -> Self {
39+
FileNumber {
40+
file_number: Arc::new(file_number),
41+
}
42+
}
43+
44+
/// Returns whether there is no clone of this FileNumber in existance.
45+
///
46+
/// /!\ care should be taken to not have some other code store a &FileNumber which could alias
47+
/// with self as it might then be sementically incorrect to delete content based only on this
48+
/// returning `true`.
49+
pub fn can_be_deleted(&self) -> bool {
50+
Arc::strong_count(&self.file_number) == 1
51+
}
52+
53+
#[cfg(test)]
54+
pub fn unroll(&self, tracker: &crate::rolling::FileTracker) -> Vec<u64> {
55+
let mut file = self.clone();
56+
let mut file_numbers = Vec::new();
57+
loop {
58+
file_numbers.push(file.file_number());
59+
if let Some(next_file) = tracker.next(&file) {
60+
file = next_file;
61+
} else {
62+
return file_numbers;
63+
}
64+
}
65+
}
66+
67+
pub fn filename(&self) -> String {
68+
format!("wal-{:020}", self.file_number)
69+
}
70+
71+
#[cfg(test)]
72+
pub fn file_number(&self) -> u64 {
73+
*self.file_number
74+
}
75+
76+
#[cfg(test)]
77+
pub fn for_test(file_number: u64) -> Self {
78+
FileNumber::new(file_number)
79+
}
80+
}
81+
3182
/// Resources used by mrecordlog
3283
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3384
pub struct ResourceUsage {

src/mem/arena.rs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
use std::time::{Duration, Instant};
2+
3+
#[cfg(not(test))]
4+
pub const PAGE_SIZE: usize = 1 << 20;
5+
6+
#[cfg(test)]
7+
pub const PAGE_SIZE: usize = 7;
8+
9+
// TODO make it an array once we get a way to allocate array on the heap.
10+
pub type Page = Box<[u8]>;
11+
12+
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
13+
pub struct PageId(usize);
14+
15+
/// An arena of fixed sized pages.
16+
#[derive(Default)]
17+
pub struct Arena {
18+
/// We use an array to store the list of pages.
19+
/// It can be seen as an efficient map from page id to pages.
20+
///
21+
/// This map's len can-only grows. Its size is therefore the maximum number of pages
22+
/// that was ever allocated. One page being 1MB long, this is not a problem.
23+
///
24+
/// If a page is not allocated, the corresponding entry is `None`.
25+
pages: Vec<Option<Page>>,
26+
/// `free_slots` slots keeps track of the pages that are not allocated.
27+
free_slots: Vec<PageId>,
28+
/// `free_page_ids` keeps track of the allocated pages that are
29+
/// available.
30+
free_page_ids: Vec<PageId>,
31+
/// Arena stats used to track how many pages should be freed.
32+
stats: ArenaStats,
33+
}
34+
35+
// The idea here is that we keep track of the maximum number of pages used through time.
36+
// To identify if it is worth deallocating pages, we look at the maximum number of pages
37+
// in use in the last few minutes minutes.
38+
//
39+
// We then allow ourselves to free memory down to this value.
40+
// Tracking exactly the maximum number of pages in use in the last 5 minutes is unnecessarily
41+
// complicated.
42+
//
43+
// For instance, we could run an extra task or thread.
44+
//
45+
// Instead, we just run a routine whenever someone interacts with the GC.
46+
// This routine only checks time 1 out of 256 calls.
47+
//
48+
// Pitfall: If pages are requests way less often than 256 times per minutes,
49+
// this arena may take way too much time to release its memory.
50+
struct ArenaStats {
51+
max_num_used_pages_former: usize,
52+
max_num_used_pages_current: usize,
53+
call_counter: u8,
54+
next_window_start: Instant,
55+
}
56+
57+
const WINDOW: Duration = Duration::from_secs(60);
58+
59+
impl Default for ArenaStats {
60+
fn default() -> ArenaStats {
61+
ArenaStats {
62+
// We arbitrarily initialize num used pages former to 100.
63+
max_num_used_pages_former: 100,
64+
max_num_used_pages_current: 0,
65+
call_counter: 0u8,
66+
next_window_start: Instant::now(),
67+
}
68+
}
69+
}
70+
71+
impl ArenaStats {
72+
/// This method happens when we are changing time window.
73+
fn roll(&mut self, now: Instant) {
74+
self.max_num_used_pages_former = self.max_num_used_pages_current;
75+
self.max_num_used_pages_current = 0;
76+
self.next_window_start = now + WINDOW;
77+
}
78+
79+
pub fn record_num_used_page(&mut self, num_used_pages: usize) -> usize {
80+
// The only function of the call counter is to avoid calling `Instant::now()`
81+
// at every single call.
82+
self.call_counter = self.call_counter.wrapping_add(1);
83+
if self.call_counter == 0u8 {
84+
let now = Instant::now();
85+
if now > self.next_window_start {
86+
self.roll(now);
87+
}
88+
}
89+
self.max_num_used_pages_current = self.max_num_used_pages_current.max(num_used_pages);
90+
self.target_num_pages()
91+
}
92+
93+
// This method returns a target number of pages.
94+
//
95+
// If we currently have a number of allocated pages higher than this, we need to free
96+
// pages until we reach this number.
97+
fn target_num_pages(&self) -> usize {
98+
let max_over_both_windows = self
99+
.max_num_used_pages_former
100+
.max(self.max_num_used_pages_current);
101+
(max_over_both_windows + 10).max(max_over_both_windows * 105 / 100)
102+
}
103+
}
104+
105+
impl Arena {
106+
/// Returns an allocated page id.
107+
pub fn get_page_id(&mut self) -> PageId {
108+
if let Some(page_id) = self.free_page_ids.pop() {
109+
assert!(self.pages[page_id.0].is_some());
110+
return page_id;
111+
}
112+
let page: Page = vec![0u8; PAGE_SIZE].into_boxed_slice();
113+
if let Some(free_slot) = self.free_slots.pop() {
114+
let slot = &mut self.pages[free_slot.0];
115+
assert!(slot.is_none());
116+
*slot = Some(page);
117+
return free_slot;
118+
} else {
119+
let new_page_id = self.pages.len();
120+
self.pages.push(Some(page));
121+
PageId(new_page_id)
122+
}
123+
}
124+
125+
#[inline]
126+
pub fn page(&self, page_id: PageId) -> &[u8] {
127+
self.pages[page_id.0].as_ref().unwrap()
128+
}
129+
130+
#[inline]
131+
pub fn page_mut(&mut self, page_id: PageId) -> &mut [u8] {
132+
self.pages[page_id.0].as_mut().unwrap()
133+
}
134+
135+
pub fn release_page(&mut self, page_id: PageId) {
136+
self.free_page_ids.push(page_id);
137+
assert!(self.pages[page_id.0].is_some());
138+
self.gc();
139+
}
140+
141+
/// `gc` releases memory by deallocating ALL of the free pages.
142+
pub fn gc(&mut self) {
143+
let num_used_pages = self.num_used_pages();
144+
let target_num_pages = self.stats.record_num_used_page(num_used_pages);
145+
let num_pages_to_free = self.num_allocated_pages().saturating_sub(target_num_pages);
146+
assert!(num_pages_to_free <= self.free_page_ids.len());
147+
for _ in 0..num_pages_to_free {
148+
let page_id = self.free_page_ids.pop().unwrap();
149+
self.pages[page_id.0] = None;
150+
self.free_slots.push(page_id);
151+
}
152+
}
153+
154+
/// Returns the number of pages that are allocated
155+
/// (regardless of whether they are in use or not).
156+
pub fn num_allocated_pages(&self) -> usize {
157+
self.pages.len() - self.free_slots.len()
158+
}
159+
160+
/// Returns the number of pages that are allocated AND actually used.
161+
pub fn num_used_pages(&self) -> usize {
162+
self.pages.len() - self.free_slots.len() - self.free_page_ids.len()
163+
}
164+
165+
pub fn capacity(&self) -> usize {
166+
self.num_allocated_pages() * PAGE_SIZE
167+
}
168+
169+
pub fn unused_capacity(&self) -> usize {
170+
self.free_page_ids.len() * PAGE_SIZE
171+
}
172+
}
173+
174+
#[cfg(test)]
175+
mod tests {
176+
use super::*;
177+
178+
#[test]
179+
fn test_arena_simple() {
180+
let mut arena = Arena::default();
181+
assert_eq!(arena.capacity(), 0);
182+
assert_eq!(arena.get_page_id(), PageId(0));
183+
assert_eq!(arena.get_page_id(), PageId(1));
184+
arena.release_page(PageId(0));
185+
assert_eq!(arena.get_page_id(), PageId(0));
186+
}
187+
188+
#[test]
189+
fn test_arena_gc() {
190+
let mut arena = Arena::default();
191+
assert_eq!(arena.capacity(), 0);
192+
assert_eq!(arena.get_page_id(), PageId(0));
193+
assert_eq!(arena.get_page_id(), PageId(1));
194+
arena.release_page(PageId(1));
195+
assert_eq!(arena.num_allocated_pages(), 2);
196+
arena.gc();
197+
assert_eq!(arena.num_allocated_pages(), 1);
198+
assert_eq!(arena.get_page_id(), PageId(1));
199+
assert_eq!(arena.num_allocated_pages(), 2);
200+
}
201+
}

src/mem/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
mod arena;
12
mod queue;
23
mod queues;
4+
mod rolling_buffer;
35

6+
use self::arena::{Arena, PAGE_SIZE};
47
pub(crate) use self::queue::MemQueue;
58
pub(crate) use self::queues::MemQueues;
9+
use self::rolling_buffer::RollingBuffer;
610

711
#[cfg(test)]
812
mod tests;

0 commit comments

Comments
 (0)