Skip to content

Commit

Permalink
Closes #2716: Add dataframe merge functionality (#2781)
Browse files Browse the repository at this point in the history
* add merge functionality

* moving functionality to dataframe.py

* remove numeric import

* change exception error to TypeError

* int col float behavior

* remove extraneous code and fix type errors

* change the float cast from np to ak

* Update arkouda/dataframe.py

Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com>

* Update arkouda/dataframe.py

Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com>

* Update arkouda/dataframe.py

Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com>

* address some of Pierce's comments

* identical column suffixes

* added df.merge functions

* bug fix for the right_join_merge method

* add merge test for dataframe

* temp test fix, order is wonky but not wrong

* Update arkouda/dataframe.py

---------

Co-authored-by: Eddie <eddie@MacBook-Air.local>
Co-authored-by: pierce <48131946+pierce314159@users.noreply.github.com>
Co-authored-by: Pierce Hayes <pierce314159@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 6, 2023
1 parent 84fbe90 commit f3f1de8
Show file tree
Hide file tree
Showing 2 changed files with 364 additions and 1 deletion.
298 changes: 297 additions & 1 deletion arkouda/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pandas as pd # type: ignore
from typeguard import typechecked

from arkouda.alignment import find
from arkouda.categorical import Categorical
from arkouda.client import generic_msg, maxTransferBytes
from arkouda.client_dtypes import BitVector, Fields, IPv4
Expand All @@ -21,11 +22,12 @@
from arkouda.groupbyclass import GroupBy as akGroupBy
from arkouda.groupbyclass import unique
from arkouda.index import Index
from arkouda.join import inner_join
from arkouda.numeric import cast as akcast
from arkouda.numeric import cumsum, where
from arkouda.pdarrayclass import RegistrationError, pdarray
from arkouda.pdarraycreation import arange, array, create_pdarray, zeros
from arkouda.pdarraysetops import concatenate, in1d, intersect1d
from arkouda.pdarraysetops import concatenate, in1d, intersect1d, setdiff1d
from arkouda.row import Row
from arkouda.segarray import SegArray
from arkouda.series import Series
Expand All @@ -43,6 +45,9 @@
"intersect",
"invert_permutation",
"intx",
"inner_join_merge",
"right_join_merge",
"merge",
]


Expand Down Expand Up @@ -2238,6 +2243,118 @@ def numeric_help(d):
ret_dict = json.loads(generic_msg(cmd="corrMatrix", args=args))
return DataFrame({c: create_pdarray(ret_dict[c]) for c in self.columns})

@typechecked
def inner_join_merge(
self,
right: DataFrame,
on: str,
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
"""
Utilizes the ak.join.inner_join function to return an ak
DataFrame object containing only rows that are in both
self and right Dataframes, (based on the "on" param),
as well as their associated values. For this function self
is considered the left dataframe.
Parameters
----------
right : DataFrame
The Right DataFrame to be joined
on : str
The name of the DataFrame column the join is being
performed on
left_suffix: str = "_x"
A string indicating the suffix to add to columns from self for overlapping
column names in both left and right. Defaults to "_x"
right_suffix: str = "_y"
A string indicating the suffix to add to columns from the other dataframe for overlapping
column names in both left and right. Defaults to "_y"
Returns
-------
DataFrame
Inner-Joined Arkouda DataFrame
"""
return inner_join_merge(self, right, on, left_suffix, right_suffix)

def right_join_merge(self, right: DataFrame, on: str) -> DataFrame:
"""
Utilizes the ak.join.inner_join_merge function to return an
ak DataFrame object containing all the rows in the right Dataframe,
as well as corresponding rows in self (based on the "on" param),
and all of their associated values. For this function self
is considered the left dataframe.
Based on pandas merge functionality.
Parameters
----------
right : DataFrame
The Right DataFrame to be joined
on : str
The name of the DataFrame column the join is being
performed on
Returns
-------
DataFrame
Right-Joined Arkouda DataFrame
"""
return right_join_merge(self, right, on)

@typechecked
def merge(
self,
right: DataFrame,
on: str,
how: str,
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
"""
Utilizes the ak.join.inner_join_merge and the ak.join.right_join_merge
functions to return a merged Arkouda DataFrame object
containing rows from both DataFrames as specified by the merge
condition (based on the "how" and "on" parameters). For this function self
is considered the left dataframe.
Based on pandas merge functionality.
https://github.com/pandas-dev/pandas/blob/main/pandas/core/reshape/merge.py#L137
Parameters
----------
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
how: str
The merge condition.
Must be "inner", "left", or "right"
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x". Only used when how is "inner"
right_suffix: str = "_y"
A string indicating the suffix to add to columns from the right dataframe for overlapping
column names in both left and right. Defaults to "_y". Only used when how is "inner"
Returns
-------
DataFrame
Joined Arkouda DataFrame
"""

if how == "inner":
return inner_join_merge(self, right, on, left_suffix, right_suffix)
elif how == "right":
return right_join_merge(self, right, on)
elif how == "left":
return right_join_merge(right, self, on)
else:
raise ValueError(
f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'"
)

@typechecked
def register(self, user_defined_name: str) -> DataFrame:
"""
Expand Down Expand Up @@ -2706,3 +2823,182 @@ def invert_permutation(perm):
if (unique(perm).size != perm.size) and (perm.size != rng + 1):
raise ValueError("The array is not a permutation.")
return coargsort([perm, arange(perm.size)])


@typechecked
def inner_join_merge(
left: DataFrame,
right: DataFrame,
on: str,
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
"""
Utilizes the ak.join.inner_join function to return an ak
DataFrame object containing only rows that are in both
the left and right Dataframes, (based on the "on" param),
as well as their associated values.
Parameters
----------
left: DataFrame
The Left DataFrame to be joined
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x"
right_suffix: str = "_y"
A string indicating the suffix to add to columns from the right dataframe for overlapping
column names in both left and right. Defaults to "_y"
Returns
-------
DataFrame
Inner-Joined Arkouda DataFrame
"""

left_inds, right_inds = inner_join(left[on], right[on])

left_cols = left.columns.copy()
left_cols.remove(on)
right_cols = right.columns.copy()
right_cols.remove(on)

new_dict = {on: left[on][left_inds]}

for col in left_cols:
if col in right_cols:
new_col = col + left_suffix
else:
new_col = col
new_dict[new_col] = left[col][left_inds]
for col in right_cols:
if col in left_cols:
new_col = col + right_suffix
else:
new_col = col
new_dict[new_col] = right[col][right_inds]

return DataFrame(new_dict)


def right_join_merge(
left: DataFrame,
right: DataFrame,
on: str,
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
"""
Utilizes the ak.join.inner_join_merge function to return an
ak DataFrame object containing all the rows in the right Dataframe,
as well as corresponding rows in the left (based on the "on" param),
and all of their associated values.
Based on pandas merge functionality.
Parameters
----------
left: DataFrame
The Left DataFrame to be joined
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x"
right_suffix: str = "_y"
A string indicating the suffix to add to columns from the right dataframe for overlapping
column names in both left and right. Defaults to "_y"
Returns
-------
DataFrame
Right-Joined Arkouda DataFrame
"""

left_cols = left.columns.copy()
left_cols.remove(on)

in_left = inner_join_merge(left, right, on, left_suffix, right_suffix)
in_left_cols = in_left.columns.copy()
in_left_cols.remove(on)

not_in_left = right[find(setdiff1d(right[on], left[on]), right[on])]
for col in not_in_left.columns:
if col in left_cols:
new_col = col + right_suffix
not_in_left[new_col] = not_in_left[col]
not_in_left = not_in_left.drop(col, axis=1)

nan_cols = list(set(in_left) - set(in_left).intersection(set(not_in_left)))

for col in nan_cols:
# Create a nan array for all values not in the left df
nan_arr = zeros(len(not_in_left))
nan_arr.fill(np.nan)
left_col_type = type(in_left[col])
if in_left[col].dtype == int:
in_left[col] = akcast(in_left[col], akfloat64)
else:
nan_arr = akcast(nan_arr, in_left[col].dtype)

try:
not_in_left[col] = left_col_type(nan_arr)
except TypeError:
not_in_left[col] = nan_arr

right_ak_df = DataFrame.append(in_left, not_in_left)

return right_ak_df


@typechecked
def merge(
left: DataFrame,
right: DataFrame,
on: str,
how: str,
left_suffix: str = "_x",
right_suffix: str = "_y",
) -> DataFrame:
"""
Utilizes the ak.join.inner_join_merge and the ak.join.right_join_merge
functions to return a merged Arkouda DataFrame object
containing rows from both DataFrames as specified by the merge
condition (based on the "how" and "on" parameters).
Based on pandas merge functionality.
https://github.com/pandas-dev/pandas/blob/main/pandas/core/reshape/merge.py#L137
Parameters
----------
left: DataFrame
The Left DataFrame to be joined
right: DataFrame
The Right DataFrame to be joined
on: str
The name of the DataFrame column the join is being
performed on
how: str
The merge condition.
Must be "inner", "left", or "right"
left_suffix: str = "_x"
A string indicating the suffix to add to columns from the left dataframe for overlapping
column names in both left and right. Defaults to "_x". Only used when how is "inner"
right_suffix: str = "_y"
A string indicating the suffix to add to columns from the right dataframe for overlapping
column names in both left and right. Defaults to "_y". Only used when how is "inner"
Returns
-------
DataFrame
Joined Arkouda DataFrame
"""

if how == 'inner':
return inner_join_merge(left, right, on, left_suffix, right_suffix)
elif how == 'right':
return right_join_merge(left, right, on, left_suffix, right_suffix)
elif how == 'left':
return right_join_merge(right, left, on, right_suffix, left_suffix)
else:
raise ValueError(f"Unexpected value of {how} for how. Must choose: 'inner', 'left', or 'right'")
Loading

0 comments on commit f3f1de8

Please sign in to comment.