Skip to content

Commit

Permalink
WIP: Initial implementation of match_values() and preview_value_mappi…
Browse files Browse the repository at this point in the history
…ngs()
  • Loading branch information
aecio committed Jun 14, 2024
1 parent 176fd14 commit 0ae6af3
Show file tree
Hide file tree
Showing 5 changed files with 2,905 additions and 4 deletions.
263 changes: 262 additions & 1 deletion bdikit/functional_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from enum import Enum
from os.path import join, dirname
from typing import Union, Type, List, Optional
from typing import Union, Type, List, Dict, TypedDict, Set, Optional, Tuple, Callable
import pandas as pd
import numpy as np
from bdikit.utils import get_gdc_data
from bdikit.download import get_cached_model_or_download
from bdikit.mapping_algorithms.column_mapping.algorithms import (
BaseColumnMappingAlgorithm,
Expand All @@ -18,6 +20,23 @@
from bdikit.mapping_algorithms.scope_reducing._algorithms.contrastive_learning.cl_api import (
ContrastiveLearningAPI,
)
from bdikit.mapping_algorithms.value_mapping.algorithms import (
ValueMatch,
BaseAlgorithm,
TFIDFAlgorithm,
LLMAlgorithm,
EditAlgorithm,
EmbeddingAlgorithm,
AutoFuzzyJoinAlgorithm,
FastTextAlgorithm,
)
from bdikit.mapping_algorithms.value_mapping.value_mappers import (
ValueMapper,
FunctionValueMapper,
DictionaryMapper,
IdentityValueMapper,
)


GDC_DATA_PATH = join(dirname(__file__), "./resource/gdc_table.csv")

Expand Down Expand Up @@ -120,6 +139,33 @@ def top_matches(
return pd.concat(dfs, ignore_index=True)


class ValueMatchingMethod(Enum):
TFIDF = ("tfidf", TFIDFAlgorithm)
EDIT = ("edit_distance", EditAlgorithm)
EMBEDDINGS = ("embedding", EmbeddingAlgorithm)
AUTOFJ = ("auto_fuzzy_join", AutoFuzzyJoinAlgorithm)
FASTTEXT = ("fasttext", FastTextAlgorithm)
GPT = ("gpt", LLMAlgorithm)

def __init__(self, method_name: str, method_class: Type[BaseAlgorithm]):
self.method_name = method_name
self.method_class = method_class

@staticmethod
def get_instance(method_name: str) -> BaseAlgorithm:
methods = {
method.method_name: method.method_class for method in ValueMatchingMethod
}
try:
return methods[method_name]()
except KeyError:
names = ", ".join(list(methods.keys()))
raise ValueError(
f"The {method_name} algorithm is not supported. "
f"Supported algorithms are: {names}"
)


def materialize_mapping(
input_dataframe: pd.DataFrame, target: List[dict]
) -> pd.DataFrame:
Expand All @@ -140,3 +186,218 @@ def map_column_values(
new_column = value_mapper.map(input_column)
new_column.name = target
return new_column


class ValueMatchingResult(TypedDict):
target_column: str
matches: List[ValueMatch]
coverage: float
unique_values: Set[str]
unmatch_values: Set[str]


def match_values(
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: pd.DataFrame,
method: str = ValueMatchingMethod.EDIT.name,
) -> Dict[str, ValueMatchingResult]:
"""
Maps the values of the dataset columns to the target domain using the given method name.
"""
if isinstance(target, str) and target == "gdc":
column_names = column_mapping["target"].unique().tolist()
target_domain = get_gdc_data(column_names)
elif isinstance(target, pd.DataFrame):
target_domain = {
column_name: target[column_name].unique().tolist()
for column_name in target.columns
}
else:
raise ValueError(
"The target must be a DataFrame or a standard vocabulary name."
)

column_mapping_dict = column_mapping.set_index("source")["target"].to_dict()
value_matcher = ValueMatchingMethod.get_instance(method)
matches = _match_values(source, target_domain, column_mapping_dict, value_matcher)
return matches


def _match_values(
dataset: pd.DataFrame,
target_domain: Dict[str, Optional[List[str]]],
column_mapping: Dict[str, str],
value_matcher: BaseAlgorithm,
) -> Dict[str, ValueMatchingResult]:

mapping_results: dict[str, ValueMatchingResult] = {}

for source_column, target_column in column_mapping.items():

# 1. Select candidate columns for value mapping
target_domain_list = target_domain[target_column]
if target_domain_list is None or len(target_domain_list) == 0:
continue

unique_values = dataset[source_column].unique()
if _skip_values(unique_values):
continue

# 2. Transform the unique values to lowercase
source_values_dict: Dict[str, str] = {
str(x).strip().lower(): str(x).strip() for x in unique_values
}
target_values_dict: Dict[str, str] = {x.lower(): x for x in target_domain_list}

# 3. Apply the value matcher to create value mapping dictionaries
matches_lowercase = value_matcher.match(
list(source_values_dict.keys()), list(target_values_dict.keys())
)

# 4. Transform the matches to the original case
matches: List[ValueMatch] = []
for source_value, target_value, similarity in matches_lowercase:
matches.append(
ValueMatch(
current_value=source_values_dict[source_value],
target_value=target_values_dict[target_value],
similarity=similarity,
)
)

# 5. Calculate the coverage and unmatched values
coverage = len(matches) / len(source_values_dict)
source_values = set(source_values_dict.values())
match_values = set([x[0] for x in matches])

mapping_results[source_column] = ValueMatchingResult(
target_column=target_column,
matches=matches,
coverage=coverage,
unique_values=source_values,
unmatch_values=source_values - match_values,
)

return mapping_results


def _skip_values(unique_values: np.ndarray, max_length: int = 50):
if isinstance(unique_values[0], float):
return True
elif len(unique_values) > max_length:
return True
else:
return False


def preview_value_mappings(
dataset: pd.DataFrame,
column_mapping: Tuple[str, str],
target: Union[str, pd.DataFrame] = "gdc",
method: str = "edit_distance",
):
"""
Print the value mappings in a human-readable format.
"""
if isinstance(column_mapping, pd.DataFrame):
mapping_df = column_mapping
elif isinstance(column_mapping, tuple):
mapping_df = pd.DataFrame(
[
{
"source": column_mapping[0],
"target": column_mapping[1],
}
]
)

value_mappings = match_values(
dataset, target=target, column_mapping=mapping_df, method=method
)

# pprint(value_mappings)

dfs = []
for source_column, match_result in value_mappings.items():
column_matches_df = pd.DataFrame(
data=match_result["matches"],
columns=["source", "target", "similarity"],
)
column_matches_df["source_column"] = source_column
dfs.append(column_matches_df)

unmatched_df = pd.DataFrame(
data=list(
zip(
match_result["unmatch_values"],
[""] * len(match_result["unmatch_values"]),
[""] * len(match_result["unmatch_values"]),
)
),
columns=["source", "target", "similarity"],
)
unmatched_df["source_column"] = source_column
dfs.append(unmatched_df)

df = pd.concat(dfs, ignore_index=True)
df = df[["source_column", "source", "target", "similarity"]]
return df


def create_mapper(m: Union[None, pd.DataFrame, Dict, Callable[[pd.Series], pd.Series]]):
if m is None:
return IdentityValueMapper()

if isinstance(m, ValueMapper):
return m

if callable(m):
return FunctionValueMapper(m)

if isinstance(m, dict) and "matches" in m and isinstance(m["matches"], list):
# This is a dictionary return by match_values function
matches = m["matches"]
mapping_dict = {}
for match in matches:
if isinstance(match, ValueMatch):
mapping_dict[match.current_value] = match.target_value
elif isinstance(match, tuple) and len(match) >= 2:
if isinstance(match[0], str) and isinstance(match[1], str):
mapping_dict[match[0]] = match[1]
else:
raise ValueError(
"Tuple in matches must contain two strings: (current_value, target_value)"
)
else:
raise ValueError(
"Matches must be a list of ValueMatch objects or tuples"
)
return DictionaryMapper(mapping_dict)

if isinstance(m, pd.DataFrame) and all(
k in m.columns for k in ["current_value", "target_value"]
):
return DictionaryMapper(m.set_index("current_value")["target_value"].to_dict())


def update_mappings(value_mappings, user_mappings) -> List:
user_mappings_dict = {
user_mapping["from"] + "__" + user_mapping["to"]: user_mapping
for user_mapping in user_mappings
}
final_mappings = []
for source_colum, mapping in value_mappings.items():
key = source_colum + "__" + mapping["target_column"]
if key not in user_mappings_dict:
final_mappings.append(
{
"from": source_colum,
"to": mapping["target_column"],
"mapper": create_mapper(mapping),
}
)

final_mappings = final_mappings + user_mappings

return final_mappings
13 changes: 12 additions & 1 deletion bdikit/mapping_algorithms/value_mapping/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from openai import OpenAI
from polyfuzz import PolyFuzz
from polyfuzz.models import EditDistance, TFIDF, Embeddings
from flair.embeddings import TransformerWordEmbeddings
from flair.embeddings import TransformerWordEmbeddings, WordEmbeddings
from autofj import AutoFJ
from Levenshtein import ratio
import pandas as pd
Expand Down Expand Up @@ -91,6 +91,17 @@ def __init__(self, model_path: str = "bert-base-multilingual-cased"):
super().__init__(PolyFuzz(method))


class FastTextAlgorithm(PolyFuzzAlgorithm):
"""
Value matching algorithm based on the cosine similarity of FastText embeddings.
"""

def __init__(self, model_name: str = "en-crawl"):
embeddings = WordEmbeddings(model_name)
method = Embeddings(embeddings, min_similarity=0)
super().__init__(PolyFuzz(method))


class LLMAlgorithm(BaseAlgorithm):
def __init__(self):
self.client = OpenAI()
Expand Down
3 changes: 2 additions & 1 deletion bdikit/mapping_algorithms/value_mapping/value_mappers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
from typing import Callable


class ValueMapper:
Expand Down Expand Up @@ -34,7 +35,7 @@ class FunctionValueMapper(ValueMapper):
provided custom function.
"""

def __init__(self, function):
def __init__(self, function: Callable[[pd.Series], pd.Series]):
self.function = function

def map(self, input_column: pd.Series) -> pd.Series:
Expand Down
2,598 changes: 2,598 additions & 0 deletions examples/getting_started.ipynb

Large diffs are not rendered by default.

32 changes: 31 additions & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def test_map_column_values():
)

# then
upper_cased_values = ["A", "B", "C", "D", "E"]
upper_cased_values = pd.Series(["A", "B", "C", "D", "E"])
assert mapped_column.name == target_column_name
assert mapped_column.eq(upper_cased_values).all()

Expand Down Expand Up @@ -181,3 +181,33 @@ def test_map_dataframe_column_values():

assert "string column 2" in df_mapped.columns
assert df_mapped["string column 2"].eq(["A", "B", "C", "D", "E"]).all()


def test_value_mapping_dataframe():
# given
df_source = pd.DataFrame(
{"src_column": ["Red Apple", "Banana", "Oorange", "Strawberry"]}
)
df_target = pd.DataFrame(
{"tgt_column": ["apple", "banana", "orange", "kiwi", "grapes"]}
)

df_matches = pd.DataFrame({"source": ["src_column"], "target": ["tgt_column"]})

# when
value_mappings = bdi.match_values(df_source, df_target, df_matches, method="tfidf")
# import json
# print(f"Value mappings: {json.dumps(value_mappings, indent=2)}")
import pprint
pprint.pprint(f"Value mappings:\n")
pprint.pprint(value_mappings)

# then
assert value_mappings is not None
assert "src_column" in value_mappings
assert value_mappings["src_column"]["matches"] is not None
assert value_mappings["src_column"]["target_column"] == "tgt_column"

src_column_mapping = value_mappings["src_column"]
assert len(src_column_mapping["matches"]) == 3
assert len(src_column_mapping["matches"]) == 3

0 comments on commit 0ae6af3

Please sign in to comment.