Skip to content

Commit

Permalink
WIP: Initial implementation of match_values() and preview_value_mappi…
Browse files Browse the repository at this point in the history
…ngs()
  • Loading branch information
aecio committed Jun 12, 2024
1 parent 176fd14 commit 72b3592
Show file tree
Hide file tree
Showing 3 changed files with 2,300 additions and 2 deletions.
199 changes: 198 additions & 1 deletion bdikit/functional_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from enum import Enum
from os.path import join, dirname
from typing import Union, Type, List, Optional
from typing import Union, Type, List, Dict, TypedDict, Set, Optional, Tuple
import pandas as pd
import numpy as np
from bdikit.utils import get_gdc_data
from bdikit.download import get_cached_model_or_download
from bdikit.mapping_algorithms.column_mapping.algorithms import (
BaseColumnMappingAlgorithm,
Expand All @@ -18,6 +20,17 @@
from bdikit.mapping_algorithms.scope_reducing._algorithms.contrastive_learning.cl_api import (
ContrastiveLearningAPI,
)
from bdikit.mapping_algorithms.value_mapping.algorithms import (
ValueMatch,
BaseAlgorithm,
TFIDFAlgorithm,
LLMAlgorithm,
EditAlgorithm,
EmbeddingAlgorithm,
AutoFuzzyJoinAlgorithm,
FastTextAlgorithm,
)


GDC_DATA_PATH = join(dirname(__file__), "./resource/gdc_table.csv")

Expand Down Expand Up @@ -120,6 +133,33 @@ def top_matches(
return pd.concat(dfs, ignore_index=True)


class ValueMatchingMethod(Enum):
TFIDF = ("tfidf", TFIDFAlgorithm)
EDIT = ("edit_distance", EditAlgorithm)
EMBEDDINGS = ("embedding", EmbeddingAlgorithm)
AUTOFJ = ("auto_fuzzy_join", AutoFuzzyJoinAlgorithm)
FASTTEXT = ("fasttext", FastTextAlgorithm)
GPT = ("gpt", LLMAlgorithm)

def __init__(self, method_name: str, method_class: Type[BaseAlgorithm]):
self.method_name = method_name
self.method_class = method_class

@staticmethod
def get_instance(method_name: str) -> BaseAlgorithm:
methods = {
method.method_name: method.method_class for method in ValueMatchingMethod
}
try:
return methods[method_name]()
except KeyError:
names = ", ".join(list(methods.keys()))
raise ValueError(
f"The {method_name} algorithm is not supported. "
f"Supported algorithms are: {names}"
)


def materialize_mapping(
input_dataframe: pd.DataFrame, target: List[dict]
) -> pd.DataFrame:
Expand All @@ -140,3 +180,160 @@ def map_column_values(
new_column = value_mapper.map(input_column)
new_column.name = target
return new_column


class ValueMatchingResult(TypedDict):
target_column: str
matches: List[ValueMatch]
coverage: float
unique_values: Set[str]
unmatch_values: Set[str]


def match_values(
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: pd.DataFrame,
method: str = ValueMatchingMethod.EDIT.name,
) -> Dict[str, ValueMatchingResult]:
"""
Maps the values of the dataset columns to the target domain using the given method name.
"""
if isinstance(target, str) and target == "gdc":
column_names = column_mapping["target"].unique().tolist()
target_domain = get_gdc_data(column_names)
elif isinstance(target, pd.DataFrame):
target_domain = {
column_name: target[column_name].unique().tolist()
for column_name in target.columns
}
else:
raise ValueError(
"The target must be a DataFrame or a standard vocabulary name."
)

column_mapping_dict = column_mapping.set_index("source")["target"].to_dict()
value_matcher = ValueMatchingMethod.get_instance(method)
matches = _match_values(source, target_domain, column_mapping_dict, value_matcher)
return matches


def _match_values(
dataset: pd.DataFrame,
target_domain: Dict[str, Optional[List[str]]],
column_mapping: Dict[str, str],
value_matcher: BaseAlgorithm,
) -> Dict[str, ValueMatchingResult]:

mapping_results: dict[str, ValueMatchingResult] = {}

for source_column, target_column in column_mapping.items():

# 1. Select candidate columns for value mapping
target_domain_list = target_domain[target_column]
if target_domain_list is None or len(target_domain_list) == 0:
continue

unique_values = dataset[source_column].unique()
if _skip_values(unique_values):
continue

# 2. Transform the unique values to lowercase
source_values_dict: Dict[str, str] = {
str(x).strip().lower(): str(x).strip() for x in unique_values
}
target_values_dict: Dict[str, str] = {x.lower(): x for x in target_domain_list}

# 3. Apply the value matcher to create value mapping dictionaries
matches_lowercase = value_matcher.match(
list(source_values_dict.keys()), list(target_values_dict.keys())
)

# 4. Transform the matches to the original case
matches: List[ValueMatch] = []
for source_value, target_value, similarity in matches_lowercase:
matches.append(
ValueMatch(
current_value=source_values_dict[source_value],
target_value=target_values_dict[target_value],
similarity=similarity,
)
)

# 5. Calculate the coverage and unmatched values
coverage = len(matches) / len(source_values_dict)
source_values = set(source_values_dict.values())
match_values = set([x[0] for x in matches])

mapping_results[source_column] = ValueMatchingResult(
target_column=target_column,
matches=matches,
coverage=coverage,
unique_values=source_values,
unmatch_values=source_values - match_values,
)

return mapping_results


def _skip_values(unique_values: np.ndarray, max_length: int = 50):
if isinstance(unique_values[0], float):
return True
elif len(unique_values) > max_length:
return True
else:
return False


def preview_value_mappings(
dataset: pd.DataFrame,
column_mapping: Tuple[str, str],
target: Union[str, pd.DataFrame] = "gdc",
method: str = "edit_distance",
):
"""
Print the value mappings in a human-readable format.
"""
if isinstance(column_mapping, pd.DataFrame):
mapping_df = column_mapping
elif isinstance(column_mapping, tuple):
mapping_df = pd.DataFrame(
[
{
"source": column_mapping[0],
"target": column_mapping[1],
}
]
)

value_mappings = match_values(
dataset, target=target, column_mapping=mapping_df, method=method
)

# pprint(value_mappings)

dfs = []
for source_column, match_result in value_mappings.items():
column_matches_df = pd.DataFrame(
data=match_result["matches"],
columns=["source", "target", "similarity"],
)
column_matches_df["source_column"] = source_column
dfs.append(column_matches_df)

unmatched_df = pd.DataFrame(
data=list(
zip(
match_result["unmatch_values"],
[""] * len(match_result["unmatch_values"]),
[""] * len(match_result["unmatch_values"]),
)
),
columns=["source", "target", "similarity"],
)
unmatched_df["source_column"] = source_column
dfs.append(unmatched_df)

df = pd.concat(dfs, ignore_index=True)
df = df[["source_column", "source", "target", "similarity"]]
return df
Loading

0 comments on commit 72b3592

Please sign in to comment.