Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel compactions for Leveled compaction #83

Merged
merged 40 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
95bdeb0
preliminary parallel compactions
marvin-j97 Nov 26, 2024
2c8da7d
Merge remote-tracking branch 'origin/main' into leveled-parallel
marvin-j97 Nov 27, 2024
4dff1d2
refactor
marvin-j97 Nov 27, 2024
85a8151
add comments
marvin-j97 Nov 27, 2024
9081447
refactor: rename
marvin-j97 Nov 27, 2024
00604fb
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Nov 28, 2024
f49d476
clippy
marvin-j97 Nov 28, 2024
bcdbff1
change L0 bloom filter FPR on flush as well
marvin-j97 Nov 29, 2024
58c87d3
Merge branch 'main' into leveled-parallel
marvin-j97 Nov 29, 2024
b2492b4
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Nov 29, 2024
27b773b
take compacted bytes into account
marvin-j97 Nov 30, 2024
05beaa8
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Nov 30, 2024
d562c36
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Nov 30, 2024
87f642e
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Nov 30, 2024
77259f8
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Dec 1, 2024
d9c6c8e
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Dec 2, 2024
d726767
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Dec 4, 2024
faa3251
Merge branch '2.5.0' into leveled-parallel
marvin-j97 Dec 4, 2024
2d675ec
wip
marvin-j97 Dec 4, 2024
315d558
remove some ? in compaction worker
marvin-j97 Dec 4, 2024
c7ebaec
replace another ? in compaction worker
marvin-j97 Dec 4, 2024
37bf57d
wip
marvin-j97 Dec 4, 2024
d3e2f77
Improve HiddenSet ergonomics and fix a bug in the leveled compaction …
carlsverre Dec 4, 2024
d98c0eb
pass hidden set directly to pick_minimal_computation, but still keep …
carlsverre Dec 4, 2024
186fd9f
Update hidden_set.rs
marvin-j97 Dec 4, 2024
5c5fe26
Merge branch 'leveled-parallel' into leveled-parallel
marvin-j97 Dec 4, 2024
891011c
Update mod.rs
marvin-j97 Dec 4, 2024
1f6cbd3
Merge pull request #88 from carlsverre/leveled-parallel
marvin-j97 Dec 4, 2024
eeab262
cleanup
marvin-j97 Dec 5, 2024
27e7b4d
refactor
marvin-j97 Dec 5, 2024
46f2d83
fmt
marvin-j97 Dec 5, 2024
c219ff2
wip
marvin-j97 Dec 5, 2024
0cd43e6
refactor
marvin-j97 Dec 5, 2024
69b4038
clippy
marvin-j97 Dec 5, 2024
d4246ae
wip
marvin-j97 Dec 5, 2024
95ccb2b
refactor
marvin-j97 Dec 5, 2024
1ee2272
remove unneeded struct
marvin-j97 Dec 5, 2024
dde29cf
wip
marvin-j97 Dec 5, 2024
5806a14
refactor
marvin-j97 Dec 5, 2024
85a3ff7
refactor
marvin-j97 Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"FifoStrategy"
}

fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
let resolved_view = levels.resolved_view();

Expand All @@ -58,11 +62,8 @@ impl CompactionStrategy for Strategy {
let lifetime_sec = lifetime_us / 1000 / 1000;

if lifetime_sec > ttl_seconds.into() {
log::warn!(
"segment is older than configured TTL: {:?}",
segment.metadata.id,
);
segment_ids_to_delete.insert(segment.metadata.id);
log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
segment_ids_to_delete.insert(segment.id());
}
}
}
Expand All @@ -86,11 +87,11 @@ impl CompactionStrategy for Strategy {

bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);

segment_ids_to_delete.insert(segment.metadata.id);
segment_ids_to_delete.insert(segment.id());

log::debug!(
"dropping segment to reach configured size limit: {:?}",
segment.metadata.id,
segment.id(),
);
}
}
Expand Down
170 changes: 111 additions & 59 deletions src/compaction/leveled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,64 @@ use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
config::Config,
key_range::KeyRange,
level_manifest::{level::Level, LevelManifest},
level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
segment::Segment,
HashSet, SegmentId,
};

/// Aggregates the key range of a list of segments.
fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
}

// TODO: Currently does not take in `overshoot`
// TODO: Need to make sure compactions are not too small
fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<SegmentId>, bool) {
/// Tries to find the most optimal compaction set from
/// one level into the other.
fn pick_minimal_compaction(
curr_level: &Level,
next_level: &Level,
hidden_set: &HiddenSet,
overshoot: u64,
) -> Option<(HashSet<SegmentId>, bool)> {
// assert!(curr_level.is_disjoint, "Lx is not disjoint");
// assert!(next_level.is_disjoint, "Lx+1 is not disjoint");

struct Choice {
write_amp: f32,
segment_ids: HashSet<SegmentId>,
can_trivial_move: bool,
}

let mut choices = vec![];

let mut add_choice = |choice: Choice| {
let mut valid_choice = true;

// IMPORTANT: Compaction is blocked because of other
// on-going compaction
valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));

// NOTE: Keep compactions with 25 or less segments
// to make compactions not too large
//
// TODO: ideally, if a level has a lot of compaction debt
// compactions could be parallelized as long as they don't overlap in key range
valid_choice &= choice.segment_ids.len() <= 25;

if valid_choice {
choices.push(choice);
}
};

for size in 1..=next_level.len() {
let windows = next_level.windows(size);

for window in windows {
if hidden_set.is_blocked(window.iter().map(Segment::id)) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
continue;
}

let key_range = aggregate_key_range(window);

// Pull in all segments in current level into compaction
Expand Down Expand Up @@ -55,21 +92,32 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
curr_level.overlapping_segments(&key_range).collect()
};

if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
continue;
}

let curr_level_size = curr_level_pull_in
.iter()
.map(|x| x.metadata.file_size)
.sum::<u64>();

// NOTE: Only consider compactions where we actually do some merging
if curr_level_size > 0 {
// NOTE: Only consider compactions where we actually reach the amount
// of bytes we need to merge
if curr_level_size >= overshoot {
let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();

let mut segment_ids: HashSet<_> = window.iter().map(|x| x.metadata.id).collect();
segment_ids.extend(curr_level_pull_in.iter().map(|x| x.metadata.id));
let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));

let write_amp = (next_level_size as f32) / (curr_level_size as f32);

choices.push((write_amp, segment_ids, false));
add_choice(Choice {
write_amp,
segment_ids,
can_trivial_move: false,
});
}
}
}
Expand All @@ -79,39 +127,36 @@ fn pick_minimal_overlap(curr_level: &Level, next_level: &Level) -> (HashSet<Segm
let windows = curr_level.windows(size);

for window in windows {
let segment_ids: HashSet<SegmentId> = window.iter().map(|x| x.metadata.id).collect();
let segment_ids: HashSet<SegmentId> = window.iter().map(Segment::id).collect();

let key_range = aggregate_key_range(window);

if next_level.overlapping_segments(&key_range).next().is_none() {
choices.push((0.0, segment_ids, true));
add_choice(Choice {
write_amp: 0.0,
segment_ids,
can_trivial_move: true,
});
}
}
}

// NOTE: Keep compactions with 25 or less segments
// to make compactions not too large
//
// TODO: ideally, if a level has a lot of compaction debt
// compactions could be parallelized as long as they don't overlap in key range
choices.retain(|(_, segments, _)| segments.len() <= 25);
let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
a.write_amp
.partial_cmp(&b.write_amp)
.unwrap_or(std::cmp::Ordering::Equal)
});

let minimum_effort_choice = choices
.into_iter()
.min_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));

let (_, set, can_trivial_move) = minimum_effort_choice.expect("should exist");

(set, can_trivial_move)
minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
}

/// Levelled compaction strategy (LCS)
///
/// If a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
///
/// Each level Ln for n >= 1 can have up to ratio^n segments.
/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^n` segments.
///
/// LCS suffers from comparatively high write amplification, but has decent read & space amplification.
/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
///
/// LCS is the recommended compaction strategy to use.
///
Expand Down Expand Up @@ -162,6 +207,14 @@ impl Default for Strategy {
}

impl Strategy {
/// Calculates the level target size.
///
/// L1 = `level_base_size`
///
/// L2 = `level_base_size * ratio`
///
/// L3 = `level_base_size * ratio * ratio`
/// ...
fn level_target_size(&self, level_idx: u8) -> u64 {
assert!(level_idx >= 1, "level_target_size does not apply to L0");

Expand All @@ -172,25 +225,16 @@ impl Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"LeveledStrategy"
}

#[allow(clippy::too_many_lines)]
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let resolved_view = levels.resolved_view();
let view = &levels.levels;

// If there are any levels that already have a compactor working on it
// we can't touch those, because that could cause a race condition
// violating the leveled compaction invariance of having a single sorted
// run per level
//
// TODO: However, this can probably improved by checking two compaction
// workers just don't cross key ranges
let busy_levels = levels.busy_levels();

for (curr_level_index, level) in resolved_view
.iter()
.enumerate()
.skip(1)
.take(resolved_view.len() - 2)
//.rev()
// L1+ compactions
for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
{
// NOTE: Level count is 255 max
#[allow(clippy::cast_possible_truncation)]
Expand All @@ -202,20 +246,29 @@ impl CompactionStrategy for Strategy {
continue;
}

if busy_levels.contains(&curr_level_index) || busy_levels.contains(&next_level_index) {
continue;
}
let level_size: u64 = level
.segments
.iter()
// NOTE: Take bytes that are already being compacted into account,
// otherwise we may be overcompensating
.filter(|x| !levels.hidden_set().is_hidden(x.id()))
.map(|x| x.metadata.file_size)
.sum();

let desired_bytes = self.level_target_size(curr_level_index);

let overshoot = level.size().saturating_sub(desired_bytes);
let overshoot = level_size.saturating_sub(desired_bytes);

if overshoot > 0 {
let Some(next_level) = &resolved_view.get(next_level_index as usize) else {
let Some(next_level) = &view.get(next_level_index as usize) else {
break;
};

let (segment_ids, can_trivial_move) = pick_minimal_overlap(level, next_level);
let Some((segment_ids, can_trivial_move)) =
pick_minimal_compaction(level, next_level, levels.hidden_set(), overshoot)
else {
break;
};

// eprintln!(
// "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}",
Expand Down Expand Up @@ -250,8 +303,11 @@ impl CompactionStrategy for Strategy {
}
}

// L0->L1 compactions
{
let Some(first_level) = resolved_view.first() else {
let busy_levels = levels.busy_levels();

let Some(first_level) = view.first() else {
return Choice::DoNothing;
};

Expand Down Expand Up @@ -296,22 +352,21 @@ impl CompactionStrategy for Strategy {
}

if !busy_levels.contains(&1) {
let mut level = first_level.clone();
let mut level = (**first_level).clone();
level.sort_by_key_range();

let Some(next_level) = &resolved_view.get(1) else {
let Some(next_level) = &view.get(1) else {
return Choice::DoNothing;
};

let mut segment_ids: HashSet<u64> =
level.iter().map(|x| x.metadata.id).collect();
let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();

// Get overlapping segments in next level
let key_range = aggregate_key_range(&level);

let next_level_overlapping_segment_ids: Vec<_> = next_level
.overlapping_segments(&key_range)
.map(|x| x.metadata.id)
.map(Segment::id)
.collect();

segment_ids.extend(&next_level_overlapping_segment_ids);
Expand Down Expand Up @@ -356,9 +411,6 @@ mod tests {
use std::{path::Path, sync::Arc};
use test_log::test;

#[cfg(feature = "bloom")]
use crate::bloom::BloomFilter;

fn string_key_range(a: &str, b: &str) -> KeyRange {
KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
}
Expand Down Expand Up @@ -415,7 +467,7 @@ mod tests {
block_cache,

#[cfg(feature = "bloom")]
bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
}
.into()
}
Expand Down
6 changes: 5 additions & 1 deletion src/compaction/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ pub fn choose_least_effort_compaction(segments: &[Segment], n: usize) -> HashSet
.min_by_key(|window| window.iter().map(|s| s.metadata.file_size).sum::<u64>())
.expect("should have at least one window");

window.iter().map(|x| x.metadata.id).collect()
window.iter().map(Segment::id).collect()
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"MaintenanceStrategy"
}

fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let resolved_view = levels.resolved_view();

Expand Down
8 changes: 6 additions & 2 deletions src/compaction/major.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{config::Config, level_manifest::LevelManifest};
use crate::{config::Config, level_manifest::LevelManifest, Segment};

/// Major compaction
///
Expand Down Expand Up @@ -35,8 +35,12 @@ impl Default for Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"MajorCompaction"
}

fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
let segment_ids = levels.iter().map(|x| x.metadata.id).collect();
let segment_ids = levels.iter().map(Segment::id).collect();

Choice::Merge(CompactionInput {
segment_ids,
Expand Down
4 changes: 4 additions & 0 deletions src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub enum Choice {
/// and emits a choice on what to do.
#[allow(clippy::module_name_repetitions)]
pub trait CompactionStrategy {
// TODO: could be : Display instead
/// Gets the compaction strategy name.
fn get_name(&self) -> &'static str;

/// Decides on what to do based on the current state of the LSM-tree's levels
fn choose(&self, _: &LevelManifest, config: &Config) -> Choice;
}
Loading
Loading