From dd468305fac3d4a6f146df26a159c4a1b4f21996 Mon Sep 17 00:00:00 2001 From: Kai Norman Clasen Date: Fri, 7 Jun 2024 19:12:58 +0200 Subject: [PATCH] Rewrite code to embed source information --- src/main.rs | 298 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 188 insertions(+), 110 deletions(-) diff --git a/src/main.rs b/src/main.rs index fe6ef83..5077213 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,13 +43,8 @@ use walkdir::WalkDir; // F32(Wrapper), // } -enum SupportedWrapper { - U16(Array), - F32(Array), -} - // struct DataKeyPair<'a> { -// path: &'a Path, +// path: &'a Path // FUTURE: Fix this anti-pattern // it should be structure of arrays not @@ -57,18 +52,40 @@ enum SupportedWrapper { #[derive(Debug)] struct DataKeyPair { path: PathBuf, - safetensor_key: String, + safetensors_key: String, // could add explicit indexes to read from a given band } -/// Encoder that converts TIFF files into `safetensor` values and embeds them in an LMDB database. +#[derive(Debug)] +enum TypedDataKeyPair { + BigEarthNetS1(DataKeyPair), + BigEarthNetS2(DataKeyPair), + // ENMAP +} + +impl TypedDataKeyPair { + fn get_safetensors_key(&self) -> &String { + match self { + Self::BigEarthNetS1(d) => &d.safetensors_key, + Self::BigEarthNetS2(d) => &d.safetensors_key, + } + } +} + +#[derive(Debug)] +enum Satellite { + Sentinel1, + Sentinel2, +} + +/// Encoder that converts TIFF files into `safetensors` values and embeds them in an LMDB database. #[derive(Parser)] #[command( author, version, about, long_about = " -Encoder that converts TIFF files into `safetensor` values and embeds them in an LMDB database. +Encoder that converts TIFF files into `safetensors` values and embeds them in an LMDB database. The CLI will have one required argument, which is the path to the target LMDB file (in reality it is a directory) Then it will take name arguments that indicate which dataset is being converted: @@ -78,9 +95,9 @@ These will take the path to the root directory and find the TIFF files based on Each dataset will have their own mapping function to: - filter out wrong tiff files (files not matching a specific regular expression) -- generate a unique key for each safetensor 'value' +- generate a unique key for each safetensors 'value' - Important: The key is ONLY guaranteed to be unique for each individual dataset! -- Generate the safetensor value from the files, which will usually include each band as an individual key +- Generate the safetensors value from the files, which will usually include each band as an individual key By providing multiple datset sources, all files will be written into a single LMDB file, potentially making it easier to work with the dataset. @@ -128,14 +145,16 @@ fn main() -> anyhow::Result<()> { let mut v = Vec::new(); if let Some(bigearthnet_s1_root) = cli.bigearthnet_s1_root { println!("Starting to process BigEarthNet-S1"); - v.push(generate_grouped_files_from_bigearthnet_s1( + v.push(generate_grouped_files_from_bigearthnet( bigearthnet_s1_root.to_str().unwrap(), + Satellite::Sentinel1, )); } if let Some(bigearthnet_s2_root) = cli.bigearthnet_s2_root { println!("Starting to process BigEarthNet-S2"); - v.push(generate_grouped_files_from_bigearthnet_s2( + v.push(generate_grouped_files_from_bigearthnet( bigearthnet_s2_root.to_str().unwrap(), + Satellite::Sentinel2, )); } @@ -162,30 +181,29 @@ fn main() -> anyhow::Result<()> { Ok(()) } -/// Check that the HashMap that contains the `key`-String and the `DataKeyPair` data -/// does not contain any duplicate data and that for each `key` we have the same matching files -/// -fn check_grouped_files(grouped_files: &HashMap>) { - // create a set of safetensor_key values for each vector, while ensuring that these +/// Check that the HashMap that contains the `group`-String and the `DataKeyPair` data +/// does not contain any duplicate data and that for each `group` we have the same matching files +fn check_grouped_files(grouped_files: &HashMap>) { + // create a set of safetensors_key values for each vector, while ensuring that these // length of the set is equal to the length of the vector -> Ensuring that there aren't any duplicated keys - let safetensor_keys_sets = grouped_files + let safetensors_keys_sets = grouped_files .values() .map(|vec| { - let safetensor_keys_set = vec.iter().fold(HashSet::new(), |mut acc, d| { - acc.insert(d.safetensor_key.clone()); + let safetensors_keys_set = vec.iter().fold(HashSet::new(), |mut acc, d| { + acc.insert(d.get_safetensors_key().clone()); acc }); - if safetensor_keys_set.len() != vec.len() { + if safetensors_keys_set.len() != vec.len() { panic!( "Safetensor keys are duplicated! This should never happen! Report as bug: {:?}", vec ); } - safetensor_keys_set + safetensors_keys_set }) .collect::>(); // check that all have the same number of values per match! - let safetensor_keys = safetensor_keys_sets.into_iter().reduce(|mut acc, h| { + let safetensors_keys = safetensors_keys_sets.into_iter().reduce(|mut acc, h| { let next_set_len = h.len(); acc.extend(h); if acc.len() != next_set_len { @@ -194,7 +212,7 @@ fn check_grouped_files(grouped_files: &HashMap>) { acc }).expect("Should be non-empty iterable"); - let mut pretty_keys = safetensor_keys + let mut pretty_keys = safetensors_keys .iter() .map(|x| x.clone()) .collect::>(); @@ -215,7 +233,13 @@ fn recursively_find_tiffs(path: &str) -> Vec { .into_iter() .filter_map(|e| e.ok()) .filter(|e| { - let extension = e.path().extension().unwrap_or_default().to_str().unwrap(); + let extension = e + .path() + .extension() + .unwrap_or_default() + .to_str() + .unwrap() + .to_lowercase(); extension == "tiff" || extension == "tif" }) .map(|e| e.path().to_path_buf()) @@ -255,29 +279,85 @@ fn bigearthnet_s2_ordering(a: &str, b: &str) -> Ordering { } } +// fn generate_grouped_files_from_enmap( +// root_enmap_dir: &str, +// ) -> HashMap> { +// let paths = recursively_find_tiffs(root_enmap_dir); +// // group should be tiff name +// // safetensor key should be SPECTRAL +// // enmap/tiles/ENMAP01-____L2A-DT0000004950_20221103T162438Z_001_V010110_20221118T145147Z/ENMAP01-____L2A-DT0000004950_20221103T162438Z_001_V010110_20221118T145147Z-SPECTRAL_IMAGE.TIF +// let enmap_image_pattern = +// Regex::new(r"(?.*)-(?SPECTRAL_IMAGE)$").unwrap(); +// // let mut grouped_files = +// // generate_grouped_files_from_bigearthnet_paths(paths, &enmap_image_pattern); + +// let mut grouped_files: HashMap> = HashMap::new(); +// for p in paths { +// let cap_res = enmap_image_pattern.captures(p.file_stem().unwrap().to_str().unwrap()); +// match cap_res { +// Some(cap) => grouped_files +// .entry(cap["group"].to_string()) +// .or_default() +// .push(TypedDataKeyPair { +// path: p.clone(), +// safetensors_key: cap["safetensorsKey"].to_string(), +// }), +// None => {} // simply skip over files that do not match without printing anything, as the dataset contains additional files +// } +// } +// info!("Finished grouping enmap data!"); + +// // THINK: I believe that no ordering is necessary, as every element will only be a single key-tensor pairs + +// // for vals in grouped_files.values_mut() { +// // vals.sort_by(|a, b| bigearthnet_s1_ordering(&a.safetensors_key, &b.safetensors_key)); +// // } +// // needs to be checked before the grouped_files are merged together! +// if grouped_files.len() == 0 { +// println!("No matching tiff files found! Skipping..."); +// } else { +// // needs to be checked before the grouped_files are merged together! +// check_grouped_files(&grouped_files); +// } +// grouped_files +// } + +/// Given a regular expression `pattern` with the keys `group` and `safetensorsKey` +/// loop over all `paths` (files), extract the file stems and apply the regular expression. +/// The extracted `group` will become the key of the returning HashMap +/// and the associated value will be pushed to a vector, where each value is a `DataKeyPair` +/// with the `path` set to the considered `path` and the `safetensors_key` to the matched +/// `saftensorsKey` from the regular expression. fn generate_grouped_files_from_bigearthnet_paths( paths: Vec, - pattern: &Regex, -) -> HashMap> { - // loop over all input directories - // and give option to merge them into single LMDB file - let mut grouped_files: HashMap> = HashMap::new(); - // grouped_files contains the LMDB key as key - // and the Vec to generate the safetensor later - // this should be the return value for each dataset - + satellite: Satellite, +) -> HashMap> { + let mut grouped_files: HashMap> = HashMap::new(); + // when parallelizing the regex matching, check: + // https://docs.rs/regex/latest/regex/#sharing-a-regex-across-threads-can-result-in-contention + let pattern_str = match satellite { + Satellite::Sentinel2 => r"(?.*)_(?B[0-9A]+)$", + Satellite::Sentinel1 => r"(?.*)_(?V[VH])$", + }; + let pattern = Regex::new(&pattern_str).unwrap(); // FUTURE: potentially think about parallel access as NFS storage could benefit from it for p in paths { // fix last unwrap let cap_res = pattern.captures(p.file_stem().unwrap().to_str().unwrap()); match cap_res { - Some(cap) => grouped_files - .entry(cap["prefix"].to_string()) - .or_default() - .push(DataKeyPair { + Some(cap) => { + let datakeypair = DataKeyPair { path: p.clone(), - safetensor_key: cap["key"].to_string(), - }), + safetensors_key: cap["safetensorsKey"].to_string(), + }; + grouped_files + .entry(cap["group"].to_string()) + .or_default() + .push(match satellite { + Satellite::Sentinel1 => TypedDataKeyPair::BigEarthNetS1(datakeypair), + Satellite::Sentinel2 => TypedDataKeyPair::BigEarthNetS2(datakeypair), + }) + } None => { warn!("Found a tiff file that doesn't match the expected regular expression: \n{}\nThis might indicate issues with the dataset directory!", p.to_str().unwrap_or("")); } @@ -287,40 +367,24 @@ fn generate_grouped_files_from_bigearthnet_paths( grouped_files } -fn generate_grouped_files_from_bigearthnet_s2( - root_ben_s2_dir: &str, -) -> HashMap> { - let paths = recursively_find_tiffs(root_ben_s2_dir); - // when parallelizing the regex matching, check: - // https://docs.rs/regex/latest/regex/#sharing-a-regex-across-threads-can-result-in-contention - let ben_s2_stem_pattern = Regex::new(r"(?.*)_(?B[0-9A]+)$").unwrap(); - let mut grouped_files = - generate_grouped_files_from_bigearthnet_paths(paths, &ben_s2_stem_pattern); +fn generate_grouped_files_from_bigearthnet( + root_ben_dir: &str, + satellite: Satellite, +) -> HashMap> { + let paths = recursively_find_tiffs(root_ben_dir); + let mut grouped_files = generate_grouped_files_from_bigearthnet_paths(paths, satellite); for vals in grouped_files.values_mut() { - vals.sort_by(|a, b| bigearthnet_s2_ordering(&a.safetensor_key, &b.safetensor_key)); - } - if grouped_files.len() == 0 { - println!("No matching tiff files found! Skipping..."); - } else { - // needs to be checked before the grouped_files are merged together! - check_grouped_files(&grouped_files); - } - grouped_files -} - -fn generate_grouped_files_from_bigearthnet_s1( - root_ben_s1_dir: &str, -) -> HashMap> { - let paths = recursively_find_tiffs(root_ben_s1_dir); - // when parallelizing the regex matching, check: - // https://docs.rs/regex/latest/regex/#sharing-a-regex-across-threads-can-result-in-contention - let ben_s1_stem_pattern = Regex::new(r"(?.*)_(?V[VH])$").unwrap(); - let mut grouped_files = - generate_grouped_files_from_bigearthnet_paths(paths, &ben_s1_stem_pattern); - for vals in grouped_files.values_mut() { - vals.sort_by(|a, b| bigearthnet_s1_ordering(&a.safetensor_key, &b.safetensor_key)); + vals.sort_by(|a, b| match (a, b) { + // Only support sorting on S2 only keys here + (TypedDataKeyPair::BigEarthNetS2(a), TypedDataKeyPair::BigEarthNetS2(b)) => { + bigearthnet_s2_ordering(&a.safetensors_key, &b.safetensors_key) + } + (TypedDataKeyPair::BigEarthNetS1(a), TypedDataKeyPair::BigEarthNetS1(b)) => { + bigearthnet_s1_ordering(&a.safetensors_key, &b.safetensors_key) + } + _ => panic!("Unsupported ordering operation!"), + }) } - // needs to be checked before the grouped_files are merged together! if grouped_files.len() == 0 { println!("No matching tiff files found! Skipping..."); } else { @@ -330,7 +394,7 @@ fn generate_grouped_files_from_bigearthnet_s1( grouped_files } -fn lmdb_writer(db_path: &Path, grouped_files: &HashMap>) { +fn lmdb_writer(db_path: &Path, grouped_files: &HashMap>) { fs::create_dir_all(db_path) .expect("should be able to create target directory. Maybe check permissions?"); let env = EnvOpenOptions::new() @@ -343,13 +407,11 @@ fn lmdb_writer(db_path: &Path, grouped_files: &HashMap> // FUTURE: think about working with references instead of cloning let mut keys: Vec = grouped_files.keys().map(|e| e.clone()).collect(); - // could be changed in the future to sort not only by the prefix + // could be changed in the future to sort not only by the `group` // Remember, this only defines how the LMDB file is written, not how the // safetensors are written! keys.sort(); - // TODO: Try to get it running with an older GLIBC version! - // -> Just bite the bullet and build it as nix image and docker image // Chunk size was chosen more or less randomly. The main idea is to // not close & open a write transaction for every item and I also use // it to open the files @@ -360,9 +422,13 @@ fn lmdb_writer(db_path: &Path, grouped_files: &HashMap> // maybe the chunk could be split into different threads and the tensors can be shared let keyed_tensors: Vec<(&String, Vec)> = chunk .into_par_iter() - .map(|key| (key, mk_safetensor(grouped_files.get(key).unwrap()).unwrap())) + .map(|key| { + ( + key, + mk_safetensors(grouped_files.get(key).unwrap()).unwrap(), + ) + }) .collect(); - // FUTURE: Write a test that ensures that the output remains stable! for (key, tns) in keyed_tensors { db.put(&mut wtxn, key, &tns).expect("should write"); } @@ -370,45 +436,57 @@ fn lmdb_writer(db_path: &Path, grouped_files: &HashMap> } } +enum SupportedWrapper { + U16(Array), + F32(Array), +} + +fn mk_bigearthnet_safetensor( + datakeypair: &DataKeyPair, +) -> SupportedWrapper> { + let dataset = + Dataset::open(datakeypair.path.clone()).expect("Current file should have read access!"); + let band1 = dataset + .rasterband(1) + .expect("Tiff files should contain at least one band!"); + // tuples (x, y) are in (cols, rows) order + // `window` is the (x, y) coordinate of the upper left corner of the region to read + let window = (0, 0); + // `window_size` is the amount to read -> We will always read everything! + let window_size = band1.size(); + // assert_eq!(band1.band_type(), GdalDataType::UInt16); + match band1.band_type() { + GdalDataType::UInt16 => SupportedWrapper::U16( + band1 + .read_as_array::(window, window_size, window_size, None) + .expect("File should open correctly. Report bug!"), + ), + GdalDataType::Float32 => SupportedWrapper::F32( + band1 + .read_as_array::(window, window_size, window_size, None) + .expect("File should open correctly. Report bug!"), + ), + _ => panic!("Unsupported data type detected!"), + } +} + /// Given a `DataKeyPair` `d` vector, iterate through all elements /// and read the given raster data from `d.path` and interpret the /// data as safetensor data. /// Then construct the given safetensor data and return the resulting /// data vector. -fn mk_safetensor(pairs: &Vec) -> anyhow::Result> { - let it = pairs.into_iter().map(|e| { - let dataset = Dataset::open(e.path.clone()).expect("Current file should have read access!"); - let band1 = dataset - .rasterband(1) - .expect("Tiff files should contain at least one band!"); - // tuples (x, y) are in (cols, rows) order - // `window` is the (x, y) coordinate of the upper left corner of the region to read - let window = (0, 0); - // `window_size` is the amount to read -> We will always read everything! - let window_size = band1.size(); - // assert_eq!(band1.band_type(), GdalDataType::UInt16); - match band1.band_type() { - GdalDataType::UInt16 => SupportedWrapper::U16( - band1 - .read_as_array::(window, window_size, window_size, None) - .expect("File should open correctly. Report bug!"), - ), - GdalDataType::Float32 => SupportedWrapper::F32( - band1 - .read_as_array::(window, window_size, window_size, None) - .expect("File should open correctly. Report bug!"), - ), - _ => panic!("Unsupported data type detected!"), - } - // let arr = band1 - // .read_as_array::(window, window_size, window_size, None) - // .expect("File should open correctly. Report bug!"); - - // (e.safetensor_key.clone(), Wrapper(arr)) +fn mk_safetensors(pairs: &Vec) -> anyhow::Result> { + let it = pairs.into_iter().map(|e| match e { + TypedDataKeyPair::BigEarthNetS1(e) => mk_bigearthnet_safetensor(&e), + TypedDataKeyPair::BigEarthNetS2(e) => mk_bigearthnet_safetensor(&e), + // _ => panic!("Not implemented yet!"), }); Ok(serialize( - pairs.iter().map(|e| e.safetensor_key.clone()).zip(it), + pairs + .iter() + .map(|e| e.get_safetensors_key().clone()) + .zip(it), &None, )?) }