Skip to content

Commit

Permalink
Merge pull request #83 from fjall-rs/leveled-parallel
Browse files Browse the repository at this point in the history
Parallel compactions for Leveled compaction
  • Loading branch information
marvin-j97 authored Dec 5, 2024
2 parents 03c68c7 + 85a3ff7 commit 38f3648
Show file tree
Hide file tree
Showing 18 changed files with 436 additions and 299 deletions.
15 changes: 8 additions & 7 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"FifoStrategy"
}

fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
let resolved_view = levels.resolved_view();

Expand All @@ -58,11 +62,8 @@ impl CompactionStrategy for Strategy {
let lifetime_sec = lifetime_us / 1000 / 1000;

if lifetime_sec > ttl_seconds.into() {
log::warn!(
"segment is older than configured TTL: {:?}",
segment.metadata.id,
);
segment_ids_to_delete.insert(segment.metadata.id);
log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
segment_ids_to_delete.insert(segment.id());
}
}
}
Expand All @@ -86,11 +87,11 @@ impl CompactionStrategy for Strategy {

bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);

segment_ids_to_delete.insert(segment.metadata.id);
segment_ids_to_delete.insert(segment.id());

log::debug!(
"dropping segment to reach configured size limit: {:?}",
segment.metadata.id,
segment.id(),
);
}
}
Expand Down
170 changes: 111 additions & 59 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,64 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
config::Config,
key_range::KeyRange,
level_manifest::{level::Level, LevelManifest},
level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
segment::Segment,
HashSet, SegmentId,
};

/// Aggregates the key range of a list of segments.
fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
}

// TODO: Currently does not take in `overshoot`
// TODO: Need to make sure compactions are not too small
fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<SegmentId>, bool) {
/// Tries to find the most optimal compaction set from
/// one level into the other.
fn pick_minimal_compaction(
curr_level: &Level,
next_level: &Level,
hidden_set: &HiddenSet,
overshoot: u64,
) -> Option<(HashSet<SegmentId>, bool)> {
// assert!(curr_level.is_disjoint, "Lx is not disjoint");
// assert!(next_level.is_disjoint, "Lx+1 is not disjoint");

struct Choice {
write_amp: f32,
segment_ids: HashSet<SegmentId>,
can_trivial_move: bool,
}

let mut choices = vec![];

let mut add_choice = |choice: Choice| {
let mut valid_choice = true;

// IMPORTANT: Compaction is blocked because of other
// on-going compaction
valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));

// NOTE: Keep compactions with 25 or less segments
// to make compactions not too large
//
// TODO: ideally, if a level has a lot of compaction debt
// compactions could be parallelized as long as they don't overlap in key range
valid_choice &= choice.segment_ids.len() <= 25;

if valid_choice {
choices.push(choice);
}
};

for size in 1..=next_level.len() {
let windows = next_level.windows(size);

for window in windows {
if hidden_set.is_blocked(window.iter().map(Segment::id)) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
continue;
}

let key_range = aggregate_key_range(window);

// Pull in all segments in current level into compaction
Expand Down Expand Up @@ -55,21 +92,32 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
curr_level.overlapping_segments(&key_range).collect()
};

if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
continue;
}

let curr_level_size = curr_level_pull_in
.iter()
.map(|x| x.metadata.file_size)
.sum::<u64>();

// NOTE: Only consider compactions where we actually do some merging
if curr_level_size > 0 {
// NOTE: Only consider compactions where we actually reach the amount
// of bytes we need to merge
if curr_level_size >= overshoot {
let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();

let mut segment_ids: HashSet<_> = window.iter().map(|x| x.metadata.id).collect();
segment_ids.extend(curr_level_pull_in.iter().map(|x| x.metadata.id));
let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));

let write_amp = (next_level_size as f32) / (curr_level_size as f32);

choices.push((write_amp, segment_ids, false));
add_choice(Choice {
write_amp,
segment_ids,
can_trivial_move: false,
});
}
}
}
Expand All @@ -79,39 +127,36 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
let windows = curr_level.windows(size);

for window in windows {
let segment_ids: HashSet<SegmentId> = window.iter().map(|x| x.metadata.id).collect();
let segment_ids: HashSet<SegmentId> = window.iter().map(Segment::id).collect();

let key_range = aggregate_key_range(window);

if next_level.overlapping_segments(&key_range).next().is_none() {
choices.push((0.0, segment_ids, true));
add_choice(Choice {
write_amp: 0.0,
segment_ids,
can_trivial_move: true,
});
}
}
}

// NOTE: Keep compactions with 25 or less segments
// to make compactions not too large
//
// TODO: ideally, if a level has a lot of compaction debt
// compactions could be parallelized as long as they don't overlap in key range
choices.retain(|(_, segments, _)| segments.len() <= 25);
let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
a.write_amp
.partial_cmp(&b.write_amp)
.unwrap_or(std::cmp::Ordering::Equal)
});

let minimum_effort_choice = choices
.into_iter()
.min_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));

let (_, set, can_trivial_move) = minimum_effort_choice.expect("should exist");

(set, can_trivial_move)
minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
}

/// Levelled compaction strategy (LCS)
///
/// If a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
///
/// Each level Ln for n >= 1 can have up to ratio^n segments.
/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^n` segments.
///
/// LCS suffers from comparatively high write amplification, but has decent read & space amplification.
/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
///
/// LCS is the recommended compaction strategy to use.
///
Expand Down Expand Up @@ -162,6 +207,14 @@ impl Default for Strategy {
}

impl Strategy {
/// Calculates the level target size.
///
/// L1 = `level_base_size`
///
/// L2 = `level_base_size * ratio`
///
/// L3 = `level_base_size * ratio * ratio`
/// ...
fn level_target_size(&self, level_idx: u8) -> u64 {
assert!(level_idx >= 1, "level_target_size does not apply to L0");

Expand All @@ -172,25 +225,16 @@ impl Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"LeveledStrategy"
}

#[allow(clippy::too_many_lines)]
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let resolved_view = levels.resolved_view();
let view = &levels.levels;

// If there are any levels that already have a compactor working on it
// we can't touch those, because that could cause a race condition
// violating the leveled compaction invariance of having a single sorted
// run per level
//
// TODO: However, this can probably improved by checking two compaction
// workers just don't cross key ranges
let busy_levels = levels.busy_levels();

for (curr_level_index, level) in resolved_view
.iter()
.enumerate()
.skip(1)
.take(resolved_view.len() - 2)
//.rev()
// L1+ compactions
for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
{
// NOTE: Level count is 255 max
#[allow(clippy::cast_possible_truncation)]
Expand All @@ -202,20 +246,29 @@ impl CompactionStrategy for Strategy {
continue;
}

if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
continue;
}
let level_size: u64 = level
.segments
.iter()
// NOTE: Take bytes that are already being compacted into account,
// otherwise we may be overcompensating
.filter(|x| !levels.hidden_set().is_hidden(x.id()))
.map(|x| x.metadata.file_size)
.sum();

let desired_bytes = self.level_target_size(curr_level_index);

let overshoot = level.size().saturating_sub(desired_bytes);
let overshoot = level_size.saturating_sub(desired_bytes);

if overshoot > 0 {
let Some(next_level) = &resolved_view.get(next_level_index as usize) else {
let Some(next_level) = &view.get(next_level_index as usize) else {
break;
};

let (segment_ids, can_trivial_move) = pick_minimal_overlap(level, next_level);
let Some((segment_ids, can_trivial_move)) =
pick_minimal_compaction(level, next_level, levels.hidden_set(), overshoot)
else {
break;
};

// eprintln!(
// "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}",
Expand Down Expand Up @@ -250,8 +303,11 @@ impl CompactionStrategy for Strategy {
}
}

// L0->L1 compactions
{
let Some(first_level) = resolved_view.first() else {
let busy_levels = levels.busy_levels();

let Some(first_level) = view.first() else {
return Choice::DoNothing;
};

Expand Down Expand Up @@ -296,22 +352,21 @@ impl CompactionStrategy for Strategy {
}

if !busy_levels.contains(&1) {
let mut level = first_level.clone();
let mut level = (**first_level).clone();
level.sort_by_key_range();

let Some(next_level) = &resolved_view.get(1) else {
let Some(next_level) = &view.get(1) else {
return Choice::DoNothing;
};

let mut segment_ids: HashSet<u64> =
level.iter().map(|x| x.metadata.id).collect();
let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();

// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);

let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.map(Segment::id)
.collect();

segment_ids.extend(&next_level_overlapping_segment_ids);
Expand Down Expand Up @@ -356,9 +411,6 @@ mod tests {
use std::{path::Path, sync::Arc};
use test_log::test;

#[cfg(feature = "bloom")]
use crate::bloom::BloomFilter;

fn string_key_range(a: &str, b: &str) -> KeyRange {
KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
}
Expand Down Expand Up @@ -415,7 +467,7 @@ mod tests {
block_cache,

#[cfg(feature = "bloom")]
bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
}
.into()
}
Expand Down
6 changes: 5 additions & 1 deletion src/compaction/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ pub fn choose_least_effort_compaction(segments: &[Segment], n: usize) -> HashSet
.min_by_key(|window| window.iter().map(|s| s.metadata.file_size).sum::<u64>())
.expect("should have at least one window");

window.iter().map(|x| x.metadata.id).collect()
window.iter().map(Segment::id).collect()
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"MaintenanceStrategy"
}

fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let resolved_view = levels.resolved_view();

Expand Down
8 changes: 6 additions & 2 deletions src/compaction/major.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{config::Config, level_manifest::LevelManifest};
use crate::{config::Config, level_manifest::LevelManifest, Segment};

/// Major compaction
///
Expand Down Expand Up @@ -35,8 +35,12 @@ impl Default for Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"MajorCompaction"
}

fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let segment_ids = levels.iter().map(|x| x.metadata.id).collect();
let segment_ids = levels.iter().map(Segment::id).collect();

Choice::Merge(CompactionInput {
segment_ids,
Expand Down
4 changes: 4 additions & 0 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub enum Choice {
/// and emits a choice on what to do.
#[allow(clippy::module_name_repetitions)]
pub trait CompactionStrategy {
// TODO: could be : Display instead
/// Gets the compaction strategy name.
fn get_name(&self) -> &'static str;

/// Decides on what to do based on the current state of the LSM-tree's levels
fn choose(&self, _: &LevelManifest, config: &Config) -> Choice;
}
Loading

0 comments on commit 38f3648

Please sign in to comment.