From d3e2f775c5e55903a396ce3a5b29d9a4abe1329c Mon Sep 17 00:00:00 2001 From: Carl Sverre <82591+carlsverre@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:59:47 -0800 Subject: [PATCH 1/4] Improve HiddenSet ergonomics and fix a bug in the leveled compaction strategy where it didn't consider hidden segments when computing the minimal compaction job. --- src/compaction/leveled.rs | 43 +++++++++++++-------- src/compaction/tiered.rs | 2 +- src/compaction/worker.rs | 64 ++++++++++++++++---------------- src/level_manifest/hidden_set.rs | 36 ++++++++++++++++++ src/level_manifest/mod.rs | 60 +++++++++++++----------------- src/memtable/mod.rs | 2 +- 6 files changed, 125 insertions(+), 82 deletions(-) create mode 100644 src/level_manifest/hidden_set.rs diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 76449424..10c75467 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -6,7 +6,7 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput}; use crate::{ config::Config, key_range::KeyRange, - level_manifest::{level::Level, HiddenSet, LevelManifest}, + level_manifest::{level::Level, LevelManifest}, segment::Segment, HashSet, SegmentId, }; @@ -20,13 +20,33 @@ fn aggregate_key_range(segments: &[Segment]) -> KeyRange { fn pick_minimal_compaction( curr_level: &Level, next_level: &Level, - hidden_set: &HiddenSet, + levels: &LevelManifest, ) -> Option<(HashSet, bool)> { // assert!(curr_level.is_disjoint, "Lx is not disjoint"); // assert!(next_level.is_disjoint, "Lx+1 is not disjoint"); let mut choices = vec![]; + let mut add_choice = + |write_amp: f32, segment_ids: HashSet, can_trivial_move: bool| { + let mut valid_choice = true; + + // IMPORTANT: Compaction is blocked because of other + // on-going compaction + valid_choice &= !segment_ids.iter().any(|x| levels.segment_hidden(*x)); + + // NOTE: Keep compactions with 25 or less segments + // to make compactions not too large + // + // TODO: ideally, if a level has a lot of compaction debt + // compactions could be parallelized as long as they don't overlap in key range + valid_choice &= segment_ids.len() <= 25; + + if valid_choice { + choices.push((write_amp, segment_ids, can_trivial_move)); + } + }; + for size in 1..=next_level.len() { let windows = next_level.windows(size); @@ -34,7 +54,7 @@ fn pick_minimal_compaction( if window .iter() .map(|x| x.metadata.id) - .any(|x| hidden_set.contains(&x)) + .any(|x| levels.segment_hidden(x)) { // IMPORTANT: Compaction is blocked because of other // on-going compaction @@ -72,7 +92,7 @@ fn pick_minimal_compaction( if curr_level_pull_in .iter() .map(|x| x.metadata.id) - .any(|x| hidden_set.contains(&x)) + .any(|x| levels.segment_hidden(x)) { // IMPORTANT: Compaction is blocked because of other // on-going compaction @@ -93,7 +113,7 @@ fn pick_minimal_compaction( let write_amp = (next_level_size as f32) / (curr_level_size as f32); - choices.push((write_amp, segment_ids, false)); + add_choice(write_amp, segment_ids, false); } } } @@ -108,18 +128,11 @@ fn pick_minimal_compaction( let key_range = aggregate_key_range(window); if next_level.overlapping_segments(&key_range).next().is_none() { - choices.push((0.0, segment_ids, true)); + add_choice(0.0, segment_ids, true); } } } - // NOTE: Keep compactions with 25 or less segments - // to make compactions not too large - // - // TODO: ideally, if a level has a lot of compaction debt - // compactions could be parallelized as long as they don't overlap in key range - choices.retain(|(_, segments, _)| segments.len() <= 25); - let minimum_effort_choice = choices .into_iter() .min_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal)); @@ -216,7 +229,7 @@ impl CompactionStrategy for Strategy { .iter() // NOTE: Take bytes that are already being compacted into account, // otherwise we may be overcompensating - .filter(|x| !levels.hidden_set.contains(&x.metadata.id)) + .filter(|x| !levels.segment_hidden(x.metadata.id)) .map(|x| x.metadata.file_size) .sum(); @@ -230,7 +243,7 @@ impl CompactionStrategy for Strategy { }; let Some((segment_ids, can_trivial_move)) = - pick_minimal_compaction(level, next_level, &levels.hidden_set) + pick_minimal_compaction(level, next_level, levels) else { break; }; diff --git a/src/compaction/tiered.rs b/src/compaction/tiered.rs index 1c093e36..92c9990c 100644 --- a/src/compaction/tiered.rs +++ b/src/compaction/tiered.rs @@ -74,7 +74,7 @@ impl CompactionStrategy for Strategy { .iter() // NOTE: Take bytes that are already being compacted into account, // otherwise we may be overcompensating - .filter(|x| !levels.hidden_set.contains(&x.metadata.id)) + .filter(|x| !levels.segment_hidden(x.metadata.id)) .map(|x| x.metadata.file_size) .sum(); diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 2d1d6932..a8bb580a 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -73,7 +73,7 @@ impl Options { /// This will block until the compactor is fully finished. pub fn do_compaction(opts: &Options) -> crate::Result<()> { log::trace!("compactor: acquiring levels manifest lock"); - let mut original_levels = opts.levels.write().expect("lock is poisoned"); + let original_levels = opts.levels.write().expect("lock is poisoned"); log::trace!("compactor: consulting compaction strategy"); let choice = opts.strategy.choose(&original_levels, &opts.config); @@ -82,35 +82,15 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> { match choice { Choice::Merge(payload) => merge_segments(original_levels, opts, &payload), - Choice::Move(payload) => { - let segment_map = original_levels.get_all_segments(); - - original_levels.atomic_swap(|recipe| { - for segment_id in payload.segment_ids { - if let Some(segment) = segment_map.get(&segment_id).cloned() { - for level in recipe.iter_mut() { - level.remove(segment_id); - } - - recipe - .get_mut(payload.dest_level as usize) - .expect("destination level should exist") - .insert(segment); - } - } - }) - } - Choice::Drop(payload) => { - drop_segments( - original_levels, - opts, - &payload - .into_iter() - .map(|x| (opts.tree_id, x).into()) - .collect::>(), - )?; - Ok(()) - } + Choice::Move(payload) => move_segments(original_levels, payload), + Choice::Drop(payload) => drop_segments( + original_levels, + opts, + &payload + .into_iter() + .map(|x| (opts.tree_id, x).into()) + .collect::>(), + ), Choice::DoNothing => { log::trace!("Compactor chose to do nothing"); Ok(()) @@ -186,6 +166,28 @@ fn create_compaction_stream<'a>( } } +fn move_segments( + mut levels: RwLockWriteGuard<'_, LevelManifest>, + payload: CompactionPayload, +) -> crate::Result<()> { + let segment_map = levels.get_all_segments(); + + levels.atomic_swap(|recipe| { + for segment_id in payload.segment_ids { + if let Some(segment) = segment_map.get(&segment_id).cloned() { + for level in recipe.iter_mut() { + level.remove(segment_id); + } + + recipe + .get_mut(payload.dest_level as usize) + .expect("destination level should exist") + .insert(segment); + } + } + }) +} + #[allow(clippy::too_many_lines)] fn merge_segments( mut levels: RwLockWriteGuard<'_, LevelManifest>, @@ -202,7 +204,7 @@ fn merge_segments( if payload .segment_ids .iter() - .any(|id| levels.hidden_set.contains(id)) + .any(|id| levels.segment_hidden(*id)) { log::warn!("Compaction task contained hidden segments, declining to run it"); return Ok(()); diff --git a/src/level_manifest/hidden_set.rs b/src/level_manifest/hidden_set.rs new file mode 100644 index 00000000..41045c7a --- /dev/null +++ b/src/level_manifest/hidden_set.rs @@ -0,0 +1,36 @@ +use crate::segment::meta::SegmentId; + +use crate::HashSet; + +#[derive(Clone)] +pub(super) struct HiddenSet { + pub(crate) set: HashSet, +} + +impl Default for HiddenSet { + fn default() -> Self { + Self { + set: HashSet::with_capacity_and_hasher(10, xxhash_rust::xxh3::Xxh3Builder::new()), + } + } +} + +impl HiddenSet { + pub(crate) fn hide>(&mut self, keys: T) { + self.set.extend(keys); + } + + pub(crate) fn show>(&mut self, keys: T) { + for key in keys { + self.set.remove(&key); + } + } + + pub(crate) fn contains(&self, key: SegmentId) -> bool { + self.set.contains(&key) + } + + pub(crate) fn is_empty(&self) -> bool { + self.set.is_empty() + } +} diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index bf4e25cd..e75b6860 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -2,6 +2,7 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) +mod hidden_set; pub mod iter; pub(crate) mod level; @@ -21,8 +22,6 @@ use std::{ sync::Arc, }; -pub type HiddenSet = HashSet; - type Levels = Vec>; /// Represents the levels of a log-structured merge tree. @@ -38,7 +37,7 @@ pub struct LevelManifest { /// /// While consuming segments (because of compaction) they will not appear in the list of segments /// as to not cause conflicts between multiple compaction threads (compacting the same segments) - pub hidden_set: HiddenSet, + hidden_set: hidden_set::HiddenSet, is_disjoint: bool, } @@ -62,7 +61,7 @@ impl std::fmt::Display for LevelManifest { #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().take(2) { let id = segment.metadata.id; - let is_hidden = self.hidden_set.contains(&id); + let is_hidden = self.segment_hidden(id); write!( f, @@ -76,7 +75,7 @@ impl std::fmt::Display for LevelManifest { #[allow(clippy::indexing_slicing)] for segment in level.segments.iter().rev().take(2).rev() { let id = segment.metadata.id; - let is_hidden = self.hidden_set.contains(&id); + let is_hidden = self.segment_hidden(id); write!( f, @@ -88,7 +87,7 @@ impl std::fmt::Display for LevelManifest { } else { for segment in &level.segments { let id = segment.metadata.id; - let is_hidden = self.hidden_set.contains(&id); + let is_hidden = self.segment_hidden(id); write!( f, @@ -126,10 +125,7 @@ impl LevelManifest { let mut manifest = Self { path: path.as_ref().to_path_buf(), levels, - hidden_set: HashSet::with_capacity_and_hasher( - 10, - xxhash_rust::xxh3::Xxh3Builder::new(), - ), + hidden_set: Default::default(), is_disjoint: true, }; Self::write_to_disk(path, &manifest.deep_clone())?; @@ -235,10 +231,7 @@ impl LevelManifest { let mut manifest = Self { levels, - hidden_set: HashSet::with_capacity_and_hasher( - 10, - xxhash_rust::xxh3::Xxh3Builder::new(), - ), + hidden_set: Default::default(), path: path.as_ref().to_path_buf(), is_disjoint: false, }; @@ -379,14 +372,10 @@ impl LevelManifest { HashSet::with_capacity_and_hasher(self.len(), xxhash_rust::xxh3::Xxh3Builder::new()); for (idx, level) in self.levels.iter().enumerate() { - for segment_id in level.ids() { - if self.hidden_set.contains(&segment_id) { - // NOTE: Level count is u8 - #[allow(clippy::cast_possible_truncation)] - let idx = idx as u8; - - output.insert(idx); - } + if level.ids().any(|id| self.segment_hidden(id)) { + // NOTE: Level count is u8 + #[allow(clippy::cast_possible_truncation)] + output.insert(idx as u8); } } @@ -400,7 +389,7 @@ impl LevelManifest { for raw_level in &self.levels { let mut level = raw_level.iter().cloned().collect::>(); - level.retain(|x| !self.hidden_set.contains(&x.metadata.id)); + level.retain(|x| !self.segment_hidden(x.metadata.id)); output.push(Level { segments: level, @@ -425,16 +414,16 @@ impl LevelManifest { output } - pub(crate) fn show_segments(&mut self, keys: impl Iterator) { - for key in keys { - self.hidden_set.remove(&key); - } + pub(crate) fn segment_hidden(&self, key: SegmentId) -> bool { + self.hidden_set.contains(key) } - pub(crate) fn hide_segments(&mut self, keys: impl Iterator) { - for key in keys { - self.hidden_set.insert(key); - } + pub(crate) fn hide_segments>(&mut self, keys: T) { + self.hidden_set.hide(keys); + } + + pub(crate) fn show_segments>(&mut self, keys: T) { + self.hidden_set.show(keys); } } @@ -464,8 +453,11 @@ impl Encode for Vec { #[cfg(test)] #[allow(clippy::expect_used)] mod tests { - use crate::{coding::Encode, level_manifest::LevelManifest, AbstractTree}; - use std::collections::HashSet; + use crate::{ + coding::Encode, + level_manifest::{hidden_set::HiddenSet, LevelManifest}, + AbstractTree, + }; use test_log::test; #[test] @@ -513,7 +505,7 @@ mod tests { #[test] fn level_manifest_raw_empty() -> crate::Result<()> { let manifest = LevelManifest { - hidden_set: HashSet::default(), + hidden_set: HiddenSet::default(), levels: Vec::default(), path: "a".into(), is_disjoint: false, diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index 07f9b204..a5b390f1 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -50,7 +50,7 @@ impl Memtable { pub(crate) fn range<'a, R: RangeBounds + 'a>( &'a self, range: R, - ) -> impl DoubleEndedIterator + '_ { + ) -> impl DoubleEndedIterator + 'a { self.items.range(range).map(|entry| InternalValue { key: entry.key().clone(), value: entry.value().clone(), From d98c0ebc173cd45ef063adc18fd5429bcdf5f73e Mon Sep 17 00:00:00 2001 From: Carl Sverre <82591+carlsverre@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:18:59 -0800 Subject: [PATCH 2/4] pass hidden set directly to pick_minimal_computation, but still keep it somewhat restricted --- src/compaction/leveled.rs | 12 ++++++------ src/level_manifest/hidden_set.rs | 6 +++--- src/level_manifest/mod.rs | 7 ++++++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/compaction/leveled.rs b/src/compaction/leveled.rs index 10c75467..18bf88b2 100644 --- a/src/compaction/leveled.rs +++ b/src/compaction/leveled.rs @@ -6,7 +6,7 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput}; use crate::{ config::Config, key_range::KeyRange, - level_manifest::{level::Level, LevelManifest}, + level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest}, segment::Segment, HashSet, SegmentId, }; @@ -20,7 +20,7 @@ fn aggregate_key_range(segments: &[Segment]) -> KeyRange { fn pick_minimal_compaction( curr_level: &Level, next_level: &Level, - levels: &LevelManifest, + hidden_set: &HiddenSet, ) -> Option<(HashSet, bool)> { // assert!(curr_level.is_disjoint, "Lx is not disjoint"); // assert!(next_level.is_disjoint, "Lx+1 is not disjoint"); @@ -33,7 +33,7 @@ fn pick_minimal_compaction( // IMPORTANT: Compaction is blocked because of other // on-going compaction - valid_choice &= !segment_ids.iter().any(|x| levels.segment_hidden(*x)); + valid_choice &= !segment_ids.iter().any(|x| hidden_set.contains(*x)); // NOTE: Keep compactions with 25 or less segments // to make compactions not too large @@ -54,7 +54,7 @@ fn pick_minimal_compaction( if window .iter() .map(|x| x.metadata.id) - .any(|x| levels.segment_hidden(x)) + .any(|x| hidden_set.contains(x)) { // IMPORTANT: Compaction is blocked because of other // on-going compaction @@ -92,7 +92,7 @@ fn pick_minimal_compaction( if curr_level_pull_in .iter() .map(|x| x.metadata.id) - .any(|x| levels.segment_hidden(x)) + .any(|x| hidden_set.contains(x)) { // IMPORTANT: Compaction is blocked because of other // on-going compaction @@ -243,7 +243,7 @@ impl CompactionStrategy for Strategy { }; let Some((segment_ids, can_trivial_move)) = - pick_minimal_compaction(level, next_level, levels) + pick_minimal_compaction(level, next_level, levels.hidden_segments()) else { break; }; diff --git a/src/level_manifest/hidden_set.rs b/src/level_manifest/hidden_set.rs index 41045c7a..f0b05c42 100644 --- a/src/level_manifest/hidden_set.rs +++ b/src/level_manifest/hidden_set.rs @@ -3,7 +3,7 @@ use crate::segment::meta::SegmentId; use crate::HashSet; #[derive(Clone)] -pub(super) struct HiddenSet { +pub(crate) struct HiddenSet { pub(crate) set: HashSet, } @@ -16,11 +16,11 @@ impl Default for HiddenSet { } impl HiddenSet { - pub(crate) fn hide>(&mut self, keys: T) { + pub(super) fn hide>(&mut self, keys: T) { self.set.extend(keys); } - pub(crate) fn show>(&mut self, keys: T) { + pub(super) fn show>(&mut self, keys: T) { for key in keys { self.set.remove(&key); } diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index e75b6860..1a58908e 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -2,7 +2,7 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -mod hidden_set; +pub(crate) mod hidden_set; pub mod iter; pub(crate) mod level; @@ -14,6 +14,7 @@ use crate::{ HashMap, HashSet, }; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use hidden_set::HiddenSet; use iter::LevelManifestIterator; use level::Level; use std::{ @@ -418,6 +419,10 @@ impl LevelManifest { self.hidden_set.contains(key) } + pub(crate) fn hidden_segments(&self) -> &HiddenSet { + &self.hidden_set + } + pub(crate) fn hide_segments>(&mut self, keys: T) { self.hidden_set.hide(keys); } From 186fd9feb176da3447d45b206cb9179cc7a3b6e7 Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Wed, 4 Dec 2024 23:22:14 +0100 Subject: [PATCH 3/4] Update hidden_set.rs --- src/level_manifest/hidden_set.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/level_manifest/hidden_set.rs b/src/level_manifest/hidden_set.rs index f0b05c42..2aa34e6a 100644 --- a/src/level_manifest/hidden_set.rs +++ b/src/level_manifest/hidden_set.rs @@ -1,7 +1,12 @@ use crate::segment::meta::SegmentId; - use crate::HashSet; +/// The hidden set keeps track of which segments are currently being compacted +/// +/// When a segment is hidden (being compacted), no other compaction task can include that +/// segment, or it will be declined to be run. +/// +/// If a compaction task fails, the segments are shown again (removed from the hidden set). #[derive(Clone)] pub(crate) struct HiddenSet { pub(crate) set: HashSet, From 891011cd31134676db189b440d68cf32222352a2 Mon Sep 17 00:00:00 2001 From: Marvin <33938500+marvin-j97@users.noreply.github.com> Date: Wed, 4 Dec 2024 23:29:01 +0100 Subject: [PATCH 4/4] Update mod.rs --- src/level_manifest/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/level_manifest/mod.rs b/src/level_manifest/mod.rs index 1a58908e..18802a97 100644 --- a/src/level_manifest/mod.rs +++ b/src/level_manifest/mod.rs @@ -25,20 +25,20 @@ use std::{ type Levels = Vec>; -/// Represents the levels of a log-structured merge tree. +/// Represents the levels of a log-structured merge tree pub struct LevelManifest { - /// Path of level manifest file + /// Path of level manifest file. path: PathBuf, - /// Actual levels containing segments + /// Actual levels containing segments. #[doc(hidden)] pub levels: Levels, - /// Set of segment IDs that are masked + /// Set of segment IDs that are masked. /// /// While consuming segments (because of compaction) they will not appear in the list of segments - /// as to not cause conflicts between multiple compaction threads (compacting the same segments) - hidden_set: hidden_set::HiddenSet, + /// as to not cause conflicts between multiple compaction threads (compacting the same segments). + hidden_set: HiddenSet, is_disjoint: bool, }