Skip to content

Commit

Permalink
Closes #2838: Expand dataframe merge functions to accept multiple col…
Browse files Browse the repository at this point in the history
…umns (#2848)

This PR (closes #2838) expands the dataframe merge functions to act on multiple columns. When no value is provided for `on`, it defaults to the intersection of the columns of the left and right dataframe. `inner_join_merge` and `right_join_merge` were turned into helper functions that aren't exposed to the user to more closely match the pandas merge functionality where these are only avialble through `merge`

Co-authored-by: Pierce Hayes <pierce314159@users.noreply.github.com>
  • Loading branch information
stress-tess and Pierce Hayes authored Nov 15, 2023
1 parent aa3afc4 commit b7a0c22
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 150 deletions.
31 changes: 31 additions & 0 deletions PROTO_tests/tests/dataframe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,3 +666,34 @@ def test_subset(self):
assert df.index.to_list() == df2.index.to_list()
assert df["a"].to_list() == df2["a"].to_list()
assert df["b"].to_list() == df2["b"].to_list()

def test_multi_col_merge(self):
size = 1000
seed = 1
a = ak.randint(-size // 10, size // 10, size, seed=seed)
b = ak.randint(-size // 10, size // 10, size, seed=seed + 1)
c = ak.randint(-size // 10, size // 10, size, seed=seed + 2)
d = ak.randint(-size // 10, size // 10, size, seed=seed + 3)
left_df = ak.DataFrame({"first": a, "second": b, "third": ak.ones(size, int)})
right_df = ak.DataFrame(
{"first": c, "second": d, "third": ak.cast(ak.arange(size) % 2 == 0, int)}
)
l_pd, r_pd = left_df.to_pandas(), right_df.to_pandas()

for how in "inner", "left", "right":
for on in "first", "second", "third", ["first", "third"], ["second", "third"], None:
ak_merge = ak.merge(left_df, right_df, on=on, how=how)
pd_merge = pd.merge(l_pd, r_pd, on=on, how=how)

sorted_columns = sorted(ak_merge.columns)
assert sorted_columns == sorted(pd_merge.columns.to_list())
sorted_ak = ak_merge.sort_values(sorted_columns).reset_index()
sorted_pd = pd_merge.sort_values(sorted_columns).reset_index(drop=True)
for col in sorted_columns:
assert np.allclose(
sorted_ak[col].to_ndarray(), sorted_pd[col].to_numpy(), equal_nan=True
)
# TODO arkouda seems to be sometimes convert columns to floats on a right merge
# when pandas doesnt. Eventually we want to test frame_equal, not just value equality
# from pandas.testing import assert_frame_equal
# assert_frame_equal(sorted_ak.to_pandas()[sorted_columns], sorted_pd[sorted_columns])
224 changes: 77 additions & 147 deletions arkouda/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import pandas as pd # type: ignore
from typeguard import typechecked

from arkouda.alignment import find
from arkouda.categorical import Categorical
from arkouda.client import generic_msg, maxTransferBytes
from arkouda.client_dtypes import BitVector, Fields, IPv4
Expand All @@ -26,8 +25,8 @@
from arkouda.numeric import cast as akcast
from arkouda.numeric import cumsum, where
from arkouda.pdarrayclass import RegistrationError, pdarray
from arkouda.pdarraycreation import arange, array, create_pdarray, zeros
from arkouda.pdarraysetops import concatenate, in1d, intersect1d, setdiff1d
from arkouda.pdarraycreation import arange, array, create_pdarray, full, zeros
from arkouda.pdarraysetops import concatenate, in1d, intersect1d
from arkouda.row import Row
from arkouda.segarray import SegArray
from arkouda.series import Series
Expand All @@ -45,8 +44,6 @@
"intersect",
"invert_permutation",
"intx",
"inner_join_merge",
"right_join_merge",
"merge",
]

Expand Down Expand Up @@ -2243,72 +2240,12 @@ def numeric_help(d):
ret_dict = json.loads(generic_msg(cmd="corrMatrix", args=args))
return DataFrame({c: create_pdarray(ret_dict[c]) for c in self.columns})

@typechecked
def inner_join_merge(
self,
right: DataFrame,
on: str,
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
"""
Utilizes the ak.join.inner_join function to return an ak
DataFrame object containing only rows that are in both
self and right Dataframes, (based on the "on" param),
as well as their associated values. For this function self
is considered the left dataframe.
Parameters
----------
right : DataFrame
The Right DataFrame to be joined
on : str
The name of the DataFrame column the join is being
performed on
left_suffix: str = "_x"
A string indicating the suffix to add to columns from self for overlapping
column names in both left and right. Defaults to "_x"
right_suffix: str = "_y"
A string indicating the suffix to add to columns from the other dataframe for overlapping
column names in both left and right. Defaults to "_y"
Returns
-------
DataFrame
Inner-Joined Arkouda DataFrame
"""
return inner_join_merge(self, right, on, left_suffix, right_suffix)

def right_join_merge(self, right: DataFrame, on: str) -> DataFrame:
"""
Utilizes the ak.join.inner_join_merge function to return an
ak DataFrame object containing all the rows in the right Dataframe,
as well as corresponding rows in self (based on the "on" param),
and all of their associated values. For this function self
is considered the left dataframe.
Based on pandas merge functionality.
Parameters
----------
right : DataFrame
The Right DataFrame to be joined
on : str
The name of the DataFrame column the join is being
performed on
Returns
-------
DataFrame
Right-Joined Arkouda DataFrame
"""
return right_join_merge(self, right, on)

@typechecked
def merge(
self,
right: DataFrame,
on: str,
how: str,
on: Optional[Union[str, List[str]]] = None,
how: str = "inner",
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
Expand All @@ -2325,10 +2262,10 @@ def merge(
----------
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
how: str
on: Optional[Union[str, List[str]]] = None
The name or list of names of the DataFrame column(s) to join on.
If on is None, this defaults to the intersection of the columns in both DataFrames.
how: str = "inner",
The merge condition.
Must be "inner", "left", or "right"
left_suffix: str = "_x"
Expand All @@ -2342,18 +2279,10 @@ def merge(
-------
DataFrame
Joined Arkouda DataFrame
"""
if how == "inner":
return inner_join_merge(self, right, on, left_suffix, right_suffix)
elif how == "right":
return right_join_merge(self, right, on)
elif how == "left":
return right_join_merge(right, self, on)
else:
raise ValueError(
f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'"
)
Note: Multiple column joins are only supported for integer columns
"""
return merge(self, right, on, how, left_suffix, right_suffix)

@typechecked
def register(self, user_defined_name: str) -> DataFrame:
Expand Down Expand Up @@ -2826,10 +2755,11 @@ def invert_permutation(perm):


@typechecked
def inner_join_merge(
def _inner_join_merge(
left: DataFrame,
right: DataFrame,
on: str,
on: Union[str, List[str]],
col_intersect: Union[str, List[str]],
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
Expand All @@ -2844,9 +2774,9 @@ def inner_join_merge(
The Left DataFrame to be joined
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
on: Optional[Union[str, List[str]]] = None
The name or list of names of the DataFrame column(s) to join on.
If on is None, this defaults to the intersection of the columns in both DataFrames.
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x"
Expand All @@ -2858,36 +2788,33 @@ def inner_join_merge(
DataFrame
Inner-Joined Arkouda DataFrame
"""

left_inds, right_inds = inner_join(left[on], right[on])

left_cols = left.columns.copy()
left_cols.remove(on)
right_cols = right.columns.copy()
right_cols.remove(on)

new_dict = {on: left[on][left_inds]}
left_cols, right_cols = left.columns.copy(), right.columns.copy()
if isinstance(on, str):
left_inds, right_inds = inner_join(left[on], right[on])
new_dict = {on: left[on][left_inds]}
left_cols.remove(on)
right_cols.remove(on)
else:
left_inds, right_inds = inner_join([left[col] for col in on], [right[col] for col in on])
new_dict = {col: left[col][left_inds] for col in on}
for col in on:
left_cols.remove(col)
right_cols.remove(col)

for col in left_cols:
if col in right_cols:
new_col = col + left_suffix
else:
new_col = col
new_col = col + left_suffix if col in col_intersect else col
new_dict[new_col] = left[col][left_inds]
for col in right_cols:
if col in left_cols:
new_col = col + right_suffix
else:
new_col = col
new_col = col + right_suffix if col in col_intersect else col
new_dict[new_col] = right[col][right_inds]

return DataFrame(new_dict)


def right_join_merge(
def _right_join_merge(
left: DataFrame,
right: DataFrame,
on: str,
on: Union[str, List[str]],
col_intersect: Union[str, List[str]],
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
Expand All @@ -2903,9 +2830,9 @@ def right_join_merge(
The Left DataFrame to be joined
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
on: Optional[Union[str, List[str]]] = None
The name or list of names of the DataFrame column(s) to join on.
If on is None, this defaults to the intersection of the columns in both DataFrames.
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x"
Expand All @@ -2917,49 +2844,44 @@ def right_join_merge(
DataFrame
Right-Joined Arkouda DataFrame
"""
in_left = _inner_join_merge(left, right, on, col_intersect, left_suffix, right_suffix)
in_left_cols, left_cols = in_left.columns.copy(), left.columns.copy()
if isinstance(on, str):
left_at_on = left[on]
right_at_on = right[on]
left_cols.remove(on)
in_left_cols.remove(on)
else:
left_at_on = [left[col] for col in on]
right_at_on = [right[col] for col in on]
for col in on:
left_cols.remove(col)
in_left_cols.remove(col)

left_cols = left.columns.copy()
left_cols.remove(on)

in_left = inner_join_merge(left, right, on, left_suffix, right_suffix)
in_left_cols = in_left.columns.copy()
in_left_cols.remove(on)

not_in_left = right[find(setdiff1d(right[on], left[on]), right[on])]
not_in_left = right[~in1d(right_at_on, left_at_on)]
for col in not_in_left.columns:
if col in left_cols:
new_col = col + right_suffix
not_in_left[new_col] = not_in_left[col]
not_in_left[col + right_suffix] = not_in_left[col]
not_in_left = not_in_left.drop(col, axis=1)

nan_cols = list(set(in_left) - set(in_left).intersection(set(not_in_left)))

for col in nan_cols:
# Create a nan array for all values not in the left df
nan_arr = zeros(len(not_in_left))
nan_arr.fill(np.nan)
left_col_type = type(in_left[col])
nan_arr = full(len(not_in_left), np.nan)
if in_left[col].dtype == int:
in_left[col] = akcast(in_left[col], akfloat64)
else:
nan_arr = akcast(nan_arr, in_left[col].dtype)

try:
not_in_left[col] = left_col_type(nan_arr)
except TypeError:
not_in_left[col] = nan_arr

right_ak_df = DataFrame.append(in_left, not_in_left)

return right_ak_df
not_in_left[col] = nan_arr
return DataFrame.append(in_left, not_in_left)


@typechecked
def merge(
left: DataFrame,
right: DataFrame,
on: str,
how: str,
on: Optional[Union[str, List[str]]] = None,
how: str = "inner",
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
Expand All @@ -2976,12 +2898,12 @@ def merge(
The Left DataFrame to be joined
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
how: str
on: Optional[Union[str, List[str]]] = None
The name or list of names of the DataFrame column(s) to join on.
If on is None, this defaults to the intersection of the columns in both DataFrames.
how: str = "inner"
The merge condition.
Must be "inner", "left", or "right"
Must be one of "inner", "left", or "right"
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x". Only used when how is "inner"
Expand All @@ -2992,13 +2914,21 @@ def merge(
-------
DataFrame
Joined Arkouda DataFrame
"""
if how == 'inner':
return inner_join_merge(left, right, on, left_suffix, right_suffix)
elif how == 'right':
return right_join_merge(left, right, on, left_suffix, right_suffix)
elif how == 'left':
return right_join_merge(right, left, on, right_suffix, left_suffix)
Note: Multiple column joins are only supported for integer columns
"""
col_intersect = list(set(left.columns) & set(right.columns))
on = on if on is not None else col_intersect

if not isinstance(on, str):
if not all(isinstance(left[col], pdarray) and isinstance(right[col], pdarray) for col in on):
raise ValueError("All columns of a multi-column merge must be pdarrays")

if how == "inner":
return _inner_join_merge(left, right, on, col_intersect, left_suffix, right_suffix)
elif how == "right":
return _right_join_merge(left, right, on, col_intersect, left_suffix, right_suffix)
elif how == "left":
return _right_join_merge(right, left, on, col_intersect, right_suffix, left_suffix)
else:
raise ValueError(f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'")
2 changes: 1 addition & 1 deletion arkouda/pdarrayclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def __del__(self):
try:
logger.debug(f"deleting pdarray with name {self.name}")
generic_msg(cmd="delete", args={"name": self.name})
except RuntimeError:
except (RuntimeError, AttributeError):
pass

def __bool__(self) -> builtins.bool:
Expand Down
Loading

0 comments on commit b7a0c22

Please sign in to comment.