diff --git a/benches/memtable.rs b/benches/memtable.rs index 41e3ad6c..fedf2d14 100644 --- a/benches/memtable.rs +++ b/benches/memtable.rs @@ -3,22 +3,42 @@ use lsm_tree::{InternalValue, Memtable}; use nanoid::nanoid; fn memtable_get_upper_bound(c: &mut Criterion) { - let memtable = Memtable::default(); + c.bench_function("memtable get", |b| { + let memtable = Memtable::default(); - for _ in 0..1_000_000 { - memtable.insert(InternalValue::from_components( - format!("abc_{}", nanoid!()).as_bytes(), - vec![], - 0, - lsm_tree::ValueType::Value, - )); - } + for _ in 0..1_000_000 { + memtable.insert(InternalValue::from_components( + format!("abc_{}", nanoid!()).as_bytes(), + vec![], + 0, + lsm_tree::ValueType::Value, + )); + } - c.bench_function("memtable get", |b| { b.iter(|| { memtable.get("abc", None); }); }); } -criterion_group!(benches, memtable_get_upper_bound); + +fn memtable_highest_seqno(c: &mut Criterion) { + c.bench_function("memtable highest seqno", |b| { + let memtable = Memtable::default(); + + for x in 0..100_000 { + memtable.insert(InternalValue::from_components( + format!("abc_{}", nanoid!()).as_bytes(), + vec![], + x, + lsm_tree::ValueType::Value, + )); + } + + b.iter(|| { + assert_eq!(Some(99_999), memtable.get_highest_seqno()); + }); + }); +} + +criterion_group!(benches, memtable_get_upper_bound, memtable_highest_seqno); criterion_main!(benches); diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index 918a062c..b0abb51a 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -7,24 +7,33 @@ use crate::segment::block::ItemSize; use crate::value::{InternalValue, SeqNo, UserValue, ValueType}; use crossbeam_skiplist::SkipMap; use std::ops::RangeBounds; -use std::sync::atomic::AtomicU32; +use std::sync::atomic::{AtomicU32, AtomicU64}; -/// The memtable serves as an intermediary storage for new items +/// The memtable serves as an intermediary, ephemeral, sorted storage for new items +/// +/// When the Memtable exceeds some size, it should be flushed to a disk segment. #[derive(Default)] pub struct Memtable { + /// The actual content, stored in a lock-free skiplist. #[doc(hidden)] pub items: SkipMap, - /// Approximate active memtable size + /// Approximate active memtable size. /// - /// If this grows too large, a flush is triggered + /// If this grows too large, a flush is triggered. pub(crate) approximate_size: AtomicU32, + + /// Highest encountered sequence number. + /// + /// This is used so that `get_highest_seqno` has O(1) complexity. + pub(crate) highest_seqno: AtomicU64, } impl Memtable { /// Clears the memtable. pub fn clear(&mut self) { self.items.clear(); + self.highest_seqno = AtomicU64::new(0); self.approximate_size .store(0, std::sync::atomic::Ordering::Release); } @@ -126,18 +135,22 @@ impl Memtable { let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type); self.items.insert(key, item.value); + self.highest_seqno + .fetch_max(item.key.seqno, std::sync::atomic::Ordering::AcqRel); + (item_size, size_before + item_size) } /// Returns the highest sequence number in the memtable. pub fn get_highest_seqno(&self) -> Option { - self.items - .iter() - .map(|x| { - let key = x.key(); - key.seqno - }) - .max() + if self.is_empty() { + None + } else { + Some( + self.highest_seqno + .load(std::sync::atomic::Ordering::Acquire), + ) + } } } diff --git a/src/segment/range.rs b/src/segment/range.rs index af2c8378..1a5169cf 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -19,7 +19,8 @@ use std::sync::Arc; pub struct Range { block_index: Arc, - is_initialized: bool, + lo_initialized: bool, + hi_initialized: bool, pub(crate) range: (Bound, Bound), @@ -45,7 +46,8 @@ impl Range { ); Self { - is_initialized: false, + lo_initialized: false, + hi_initialized: false, block_index, @@ -75,9 +77,13 @@ impl Range { Some(start) } }; + if let Some(key) = start_key.cloned() { self.reader.set_lower_bound(key); } + + self.lo_initialized = true; + Ok(()) } @@ -107,19 +113,8 @@ impl Range { if let Some(key) = end_key.cloned() { self.reader.set_upper_bound(key); } - Ok(()) - } - fn initialize(&mut self) -> crate::Result<()> { - // TODO: can we skip searching for lower bound until next is called at least once...? - // would make short ranges 1.5-2x faster (if cache miss) if only one direction is used - self.initialize_lo_bound()?; - - // TODO: can we skip searching for upper bound until next_back is called at least once...? - // would make short ranges 1.5-2x faster (if cache miss) if only one direction is used - self.initialize_hi_bound()?; - - self.is_initialized = true; + self.hi_initialized = true; Ok(()) } @@ -129,8 +124,8 @@ impl Iterator for Range { type Item = crate::Result; fn next(&mut self) -> Option { - if !self.is_initialized { - if let Err(e) = self.initialize() { + if !self.lo_initialized { + if let Err(e) = self.initialize_lo_bound() { return Some(Err(e)); }; } @@ -182,16 +177,14 @@ impl Iterator for Range { impl DoubleEndedIterator for Range { fn next_back(&mut self) -> Option { - if !self.is_initialized { - if let Err(e) = self.initialize() { + if !self.hi_initialized { + if let Err(e) = self.initialize_hi_bound() { return Some(Err(e)); }; } loop { - let entry_result = self.reader.next_back()?; - - match entry_result { + match self.reader.next_back()? { Ok(entry) => { match self.range.start_bound() { Bound::Included(start) => { diff --git a/tests/segment_range.rs b/tests/segment_range.rs index 6acbba00..3fa6b3ff 100644 --- a/tests/segment_range.rs +++ b/tests/segment_range.rs @@ -48,3 +48,83 @@ fn segment_ranges() -> lsm_tree::Result<()> { Ok(()) } + +#[test] +fn segment_range_last_back() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .data_block_size(1_024) + .index_block_size(1_024) + .open()?; + + let value = (0..2_000).map(|_| 0).collect::>(); + + for x in 0..10_u64 { + let key = x.to_be_bytes(); + tree.insert(key, &value, 0); + } + tree.flush_active_memtable(0)?; + + let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes()); + assert_eq!(10, iter.count()); + + let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes()); + assert_eq!(10, iter.rev().count()); + + let mut iter = tree.range(0u64.to_be_bytes()..5u64.to_be_bytes()); + + assert_eq!(0u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(1u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(2u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(3u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(4u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert!(iter.next_back().is_none()); + + Ok(()) +} + +#[test] +fn segment_range_last_back_2() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .data_block_size(1_024) + .index_block_size(1_024) + .open()?; + + let value = (0..2_000).map(|_| 0).collect::>(); + + for x in 0..10_u64 { + let key = x.to_be_bytes(); + tree.insert(key, &value, 0); + } + tree.insert(10u64.to_be_bytes(), [], 0); + tree.insert(11u64.to_be_bytes(), [], 0); + tree.flush_active_memtable(0)?; + + let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes()); + assert_eq!(10, iter.count()); + + let iter = tree.range(0u64.to_be_bytes()..10u64.to_be_bytes()); + assert_eq!(10, iter.rev().count()); + + let mut iter = tree.range(0u64.to_be_bytes()..12u64.to_be_bytes()); + + assert_eq!(0u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(1u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(2u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(3u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(4u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(5u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(6u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(7u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(8u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(9u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(10u64.to_be_bytes(), &*iter.next().unwrap().unwrap().0); + assert_eq!(11u64.to_be_bytes(), &*iter.next_back().unwrap().unwrap().0); + assert!(iter.next().is_none()); + assert!(iter.next_back().is_none()); + + Ok(()) +}