Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve HiddenSet ergonomics and fix a bug in the leveled compaction strategy #88

Merged
merged 5 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
config::Config,
key_range::KeyRange,
level_manifest::{level::Level, HiddenSet, LevelManifest},
level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
segment::Segment,
HashSet, SegmentId,
};
Expand All @@ -28,14 +28,34 @@ fn pick_minimal_compaction(

let mut choices = vec![];

let mut add_choice =
|write_amp: f32, segment_ids: HashSet<SegmentId>, can_trivial_move: bool| {
let mut valid_choice = true;

// IMPORTANT: Compaction is blocked because of other
// on-going compaction
valid_choice &= !segment_ids.iter().any(|x| hidden_set.contains(*x));

// NOTE: Keep compactions with 25 or less segments
// to make compactions not too large
//
// TODO: ideally, if a level has a lot of compaction debt
// compactions could be parallelized as long as they don't overlap in key range
valid_choice &= segment_ids.len() <= 25;

if valid_choice {
choices.push((write_amp, segment_ids, can_trivial_move));
}
};

for size in 1..=next_level.len() {
let windows = next_level.windows(size);

for window in windows {
if window
.iter()
.map(|x| x.metadata.id)
.any(|x| hidden_set.contains(&x))
.any(|x| hidden_set.contains(x))
{
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
Expand Down Expand Up @@ -73,7 +93,7 @@ fn pick_minimal_compaction(
if curr_level_pull_in
.iter()
.map(|x| x.metadata.id)
.any(|x| hidden_set.contains(&x))
.any(|x| hidden_set.contains(x))
{
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
Expand All @@ -95,7 +115,7 @@ fn pick_minimal_compaction(

let write_amp = (next_level_size as f32) / (curr_level_size as f32);

choices.push((write_amp, segment_ids, false));
add_choice(write_amp, segment_ids, false);
}
}
}
Expand All @@ -110,18 +130,11 @@ fn pick_minimal_compaction(
let key_range = aggregate_key_range(window);

if next_level.overlapping_segments(&key_range).next().is_none() {
choices.push((0.0, segment_ids, true));
add_choice(0.0, segment_ids, true);
}
}
}

// NOTE: Keep compactions with 25 or less segments
// to make compactions not too large
//
// TODO: ideally, if a level has a lot of compaction debt
// compactions could be parallelized as long as they don't overlap in key range
choices.retain(|(_, segments, _)| segments.len() <= 25);

let minimum_effort_choice = choices
.into_iter()
.min_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
Expand Down Expand Up @@ -218,7 +231,7 @@ impl CompactionStrategy for Strategy {
.iter()
// NOTE: Take bytes that are already being compacted into account,
// otherwise we may be overcompensating
.filter(|x| !levels.hidden_set.contains(&x.metadata.id))
.filter(|x| !levels.segment_hidden(x.metadata.id))
.map(|x| x.metadata.file_size)
.sum();

Expand All @@ -232,7 +245,7 @@ impl CompactionStrategy for Strategy {
};

let Some((segment_ids, can_trivial_move)) =
pick_minimal_compaction(level, next_level, &levels.hidden_set, overshoot)
pick_minimal_compaction(level, next_level, levels.hidden_segments(), overshoot)
else {
break;
};
Expand Down
2 changes: 1 addition & 1 deletion src/compaction/tiered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl CompactionStrategy for Strategy {
.iter()
// NOTE: Take bytes that are already being compacted into account,
// otherwise we may be overcompensating
.filter(|x| !levels.hidden_set.contains(&x.metadata.id))
.filter(|x| !levels.segment_hidden(x.metadata.id))
.map(|x| x.metadata.file_size)
.sum();

Expand Down
64 changes: 33 additions & 31 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Options {
/// This will block until the compactor is fully finished.
pub fn do_compaction(opts: &Options) -> crate::Result<()> {
log::trace!("compactor: acquiring levels manifest lock");
let mut original_levels = opts.levels.write().expect("lock is poisoned");
let original_levels = opts.levels.write().expect("lock is poisoned");

log::trace!("compactor: consulting compaction strategy");
let choice = opts.strategy.choose(&original_levels, &opts.config);
Expand All @@ -82,35 +82,15 @@ pub fn do_compaction(opts: &Options) -> crate::Result<()> {

match choice {
Choice::Merge(payload) => merge_segments(original_levels, opts, &payload),
Choice::Move(payload) => {
let segment_map = original_levels.get_all_segments();

original_levels.atomic_swap(|recipe| {
for segment_id in payload.segment_ids {
if let Some(segment) = segment_map.get(&segment_id).cloned() {
for level in recipe.iter_mut() {
level.remove(segment_id);
}

recipe
.get_mut(payload.dest_level as usize)
.expect("destination level should exist")
.insert(segment);
}
}
})
}
Choice::Drop(payload) => {
drop_segments(
original_levels,
opts,
&payload
.into_iter()
.map(|x| (opts.tree_id, x).into())
.collect::<Vec<_>>(),
)?;
Ok(())
}
Choice::Move(payload) => move_segments(original_levels, payload),
Choice::Drop(payload) => drop_segments(
original_levels,
opts,
&payload
.into_iter()
.map(|x| (opts.tree_id, x).into())
.collect::<Vec<_>>(),
),
Choice::DoNothing => {
log::trace!("Compactor chose to do nothing");
Ok(())
Expand Down Expand Up @@ -186,6 +166,28 @@ fn create_compaction_stream<'a>(
}
}

fn move_segments(
mut levels: RwLockWriteGuard<'_, LevelManifest>,
payload: CompactionPayload,
) -> crate::Result<()> {
let segment_map = levels.get_all_segments();

levels.atomic_swap(|recipe| {
for segment_id in payload.segment_ids {
if let Some(segment) = segment_map.get(&segment_id).cloned() {
for level in recipe.iter_mut() {
level.remove(segment_id);
}

recipe
.get_mut(payload.dest_level as usize)
.expect("destination level should exist")
.insert(segment);
}
}
})
}

#[allow(clippy::too_many_lines)]
fn merge_segments(
mut levels: RwLockWriteGuard<'_, LevelManifest>,
Expand All @@ -202,7 +204,7 @@ fn merge_segments(
if payload
.segment_ids
.iter()
.any(|id| levels.hidden_set.contains(id))
.any(|id| levels.segment_hidden(*id))
{
log::warn!("Compaction task contained hidden segments, declining to run it");
return Ok(());
Expand Down
41 changes: 41 additions & 0 deletions src/level_manifest/hidden_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::segment::meta::SegmentId;
use crate::HashSet;

/// The hidden set keeps track of which segments are currently being compacted
///
/// When a segment is hidden (being compacted), no other compaction task can include that
/// segment, or it will be declined to be run.
///
/// If a compaction task fails, the segments are shown again (removed from the hidden set).
#[derive(Clone)]
pub(crate) struct HiddenSet {
pub(crate) set: HashSet<SegmentId>,
}

impl Default for HiddenSet {
fn default() -> Self {
Self {
set: HashSet::with_capacity_and_hasher(10, xxhash_rust::xxh3::Xxh3Builder::new()),
}
}
}

impl HiddenSet {
pub(super) fn hide<T: IntoIterator<Item = SegmentId>>(&mut self, keys: T) {
self.set.extend(keys);
}

pub(super) fn show<T: IntoIterator<Item = SegmentId>>(&mut self, keys: T) {
for key in keys {
self.set.remove(&key);
}
}

pub(crate) fn contains(&self, key: SegmentId) -> bool {
self.set.contains(&key)
}

pub(crate) fn is_empty(&self) -> bool {
self.set.is_empty()
}
}
Loading
Loading